1 # -*- coding: utf-8 -*-
4 # Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
5 # This file is part of openvim
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
12 # http://www.apache.org/licenses/LICENSE-2.0
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact with: nfvlabs@tid.es
25 This is thread that interact with the host and the libvirt to manage VM
26 One thread will be launched per host
28 __author__
= "Pablo Montes, Alfonso Tierno, Leonardo Mirabal"
29 __date__
= "$10-jul-2014 12:07:15$"
37 from jsonschema
import validate
as js_v
, exceptions
as js_e
40 from vim_schema
import localinfo_schema
, hostinfo_schema
42 #from logging import Logger
43 #import auxiliary_functions as af
45 #TODO: insert a logging system
49 #lvirt_module=None #libvirt module is charged only if not in test mode
51 class host_thread(threading
.Thread
):
53 def __init__(self
, name
, host
, user
, db
, db_lock
, test
, image_path
, host_id
, version
, develop_mode
, develop_bridge_iface
):
58 'host','user': host ip or name to manage and user
59 'db', 'db_lock': database class and lock to use it in exclusion
61 threading
.Thread
.__init
__(self
)
66 self
.db_lock
= db_lock
69 if not test
and not host_thread
.lvirt_module
:
71 module_info
= imp
.find_module("libvirt")
72 host_thread
.lvirt_module
= imp
.load_module("libvirt", *module_info
)
73 except (IOError, ImportError) as e
:
74 raise ImportError("Cannot import python-libvirt. Openvim not properly installed" +str(e
))
77 self
.develop_mode
= develop_mode
78 self
.develop_bridge_iface
= develop_bridge_iface
79 self
.image_path
= image_path
80 self
.host_id
= host_id
81 self
.version
= version
86 self
.server_status
= {} #dictionary with pairs server_uuid:server_status
87 self
.pending_terminate_server
=[] #list with pairs (time,server_uuid) time to send a terminate for a server being destroyed
88 self
.next_update_server_status
= 0 #time when must be check servers status
92 self
.queueLock
= threading
.Lock()
93 self
.taskQueue
= Queue
.Queue(2000)
96 def ssh_connect(self
):
99 self
.ssh_conn
= paramiko
.SSHClient()
100 self
.ssh_conn
.set_missing_host_key_policy(paramiko
.AutoAddPolicy())
101 self
.ssh_conn
.load_system_host_keys()
102 self
.ssh_conn
.connect(self
.host
, username
=self
.user
, timeout
=10) #, None)
103 except paramiko
.ssh_exception
.SSHException
as e
:
105 print self
.name
, ": ssh_connect ssh Exception:", text
107 def load_localinfo(self
):
113 command
= 'mkdir -p ' + self
.image_path
114 #print self.name, ': command:', command
115 (_
, stdout
, stderr
) = self
.ssh_conn
.exec_command(command
)
116 content
= stderr
.read()
118 print self
.name
, ': command:', command
, "stderr:", content
120 command
= 'cat ' + self
.image_path
+ '/.openvim.yaml'
121 #print self.name, ': command:', command
122 (_
, stdout
, stderr
) = self
.ssh_conn
.exec_command(command
)
123 content
= stdout
.read()
124 if len(content
) == 0:
125 print self
.name
, ': command:', command
, "stderr:", stderr
.read()
126 raise paramiko
.ssh_exception
.SSHException("Error empty file ")
127 self
.localinfo
= yaml
.load(content
)
128 js_v(self
.localinfo
, localinfo_schema
)
129 self
.localinfo_dirty
=False
130 if 'server_files' not in self
.localinfo
:
131 self
.localinfo
['server_files'] = {}
132 print self
.name
, ': localinfo load from host'
135 except paramiko
.ssh_exception
.SSHException
as e
:
137 print self
.name
, ": load_localinfo ssh Exception:", text
138 except host_thread
.lvirt_module
.libvirtError
as e
:
139 text
= e
.get_error_message()
140 print self
.name
, ": load_localinfo libvirt Exception:", text
141 except yaml
.YAMLError
as exc
:
143 if hasattr(exc
, 'problem_mark'):
144 mark
= exc
.problem_mark
145 text
= " at position: (%s:%s)" % (mark
.line
+1, mark
.column
+1)
146 print self
.name
, ": load_localinfo yaml format Exception", text
147 except js_e
.ValidationError
as e
:
149 if len(e
.path
)>0: text
=" at '" + ":".join(map(str, e
.path
))+"'"
150 print self
.name
, ": load_localinfo format Exception:", text
, e
.message
151 except Exception as e
:
153 print self
.name
, ": load_localinfo Exception:", text
155 #not loaded, insert a default data and force saving by activating dirty flag
156 self
.localinfo
= {'files':{}, 'server_files':{} }
157 #self.localinfo_dirty=True
158 self
.localinfo_dirty
=False
160 def load_hostinfo(self
):
168 command
= 'cat ' + self
.image_path
+ '/hostinfo.yaml'
169 #print self.name, ': command:', command
170 (_
, stdout
, stderr
) = self
.ssh_conn
.exec_command(command
)
171 content
= stdout
.read()
172 if len(content
) == 0:
173 print self
.name
, ': command:', command
, "stderr:", stderr
.read()
174 raise paramiko
.ssh_exception
.SSHException("Error empty file ")
175 self
.hostinfo
= yaml
.load(content
)
176 js_v(self
.hostinfo
, hostinfo_schema
)
177 print self
.name
, ': hostlinfo load from host', self
.hostinfo
180 except paramiko
.ssh_exception
.SSHException
as e
:
182 print self
.name
, ": load_hostinfo ssh Exception:", text
183 except host_thread
.lvirt_module
.libvirtError
as e
:
184 text
= e
.get_error_message()
185 print self
.name
, ": load_hostinfo libvirt Exception:", text
186 except yaml
.YAMLError
as exc
:
188 if hasattr(exc
, 'problem_mark'):
189 mark
= exc
.problem_mark
190 text
= " at position: (%s:%s)" % (mark
.line
+1, mark
.column
+1)
191 print self
.name
, ": load_hostinfo yaml format Exception", text
192 except js_e
.ValidationError
as e
:
194 if len(e
.path
)>0: text
=" at '" + ":".join(map(str, e
.path
))+"'"
195 print self
.name
, ": load_hostinfo format Exception:", text
, e
.message
196 except Exception as e
:
198 print self
.name
, ": load_hostinfo Exception:", text
200 #not loaded, insert a default data
203 def save_localinfo(self
, tries
=3):
205 self
.localinfo_dirty
= False
212 command
= 'cat > ' + self
.image_path
+ '/.openvim.yaml'
213 print self
.name
, ': command:', command
214 (stdin
, _
, _
) = self
.ssh_conn
.exec_command(command
)
215 yaml
.safe_dump(self
.localinfo
, stdin
, explicit_start
=True, indent
=4, default_flow_style
=False, tags
=False, encoding
='utf-8', allow_unicode
=True)
216 self
.localinfo_dirty
= False
219 except paramiko
.ssh_exception
.SSHException
as e
:
221 print self
.name
, ": save_localinfo ssh Exception:", text
222 if "SSH session not active" in text
:
224 except host_thread
.lvirt_module
.libvirtError
as e
:
225 text
= e
.get_error_message()
226 print self
.name
, ": save_localinfo libvirt Exception:", text
227 except yaml
.YAMLError
as exc
:
229 if hasattr(exc
, 'problem_mark'):
230 mark
= exc
.problem_mark
231 text
= " at position: (%s:%s)" % (mark
.line
+1, mark
.column
+1)
232 print self
.name
, ": save_localinfo yaml format Exception", text
233 except Exception as e
:
235 print self
.name
, ": save_localinfo Exception:", text
237 def load_servers_from_db(self
):
238 self
.db_lock
.acquire()
239 r
,c
= self
.db
.get_table(SELECT
=('uuid','status', 'image_id'), FROM
='instances', WHERE
={'host_id': self
.host_id
})
240 self
.db_lock
.release()
242 self
.server_status
= {}
244 print self
.name
, ": Error getting data from database:", c
247 self
.server_status
[ server
['uuid'] ] = server
['status']
249 #convert from old version to new one
250 if 'inc_files' in self
.localinfo
and server
['uuid'] in self
.localinfo
['inc_files']:
251 server_files_dict
= {'source file': self
.localinfo
['inc_files'][ server
['uuid'] ] [0], 'file format':'raw' }
252 if server_files_dict
['source file'][-5:] == 'qcow2':
253 server_files_dict
['file format'] = 'qcow2'
255 self
.localinfo
['server_files'][ server
['uuid'] ] = { server
['image_id'] : server_files_dict
}
256 if 'inc_files' in self
.localinfo
:
257 del self
.localinfo
['inc_files']
258 self
.localinfo_dirty
= True
260 def delete_unused_files(self
):
261 '''Compares self.localinfo['server_files'] content with real servers running self.server_status obtained from database
262 Deletes unused entries at self.loacalinfo and the corresponding local files.
263 The only reason for this mismatch is the manual deletion of instances (VM) at database
267 for uuid
,images
in self
.localinfo
['server_files'].items():
268 if uuid
not in self
.server_status
:
269 for localfile
in images
.values():
271 print self
.name
, ": deleting file '%s' of unused server '%s'" %(localfile
['source file'], uuid
)
272 self
.delete_file(localfile
['source file'])
273 except paramiko
.ssh_exception
.SSHException
as e
:
274 print self
.name
, ": Exception deleting file '%s': %s" %(localfile
['source file'], str(e
))
275 del self
.localinfo
['server_files'][uuid
]
276 self
.localinfo_dirty
= True
278 def insert_task(self
, task
, *aditional
):
280 self
.queueLock
.acquire()
281 task
= self
.taskQueue
.put( (task
,) + aditional
, timeout
=5)
282 self
.queueLock
.release()
285 return -1, "timeout inserting a task over host " + self
.name
289 self
.load_localinfo()
291 self
.load_servers_from_db()
292 self
.delete_unused_files()
294 self
.queueLock
.acquire()
295 if not self
.taskQueue
.empty():
296 task
= self
.taskQueue
.get()
299 self
.queueLock
.release()
303 if self
.localinfo_dirty
:
304 self
.save_localinfo()
305 elif self
.next_update_server_status
< now
:
306 self
.update_servers_status()
307 self
.next_update_server_status
= now
+ 5
308 elif len(self
.pending_terminate_server
)>0 and self
.pending_terminate_server
[0][0]<now
:
309 self
.server_forceoff()
314 if task
[0] == 'instance':
315 print self
.name
, ": processing task instance", task
[1]['action']
319 r
=self
.action_on_server(task
[1], retry
==2)
322 elif task
[0] == 'image':
324 elif task
[0] == 'exit':
325 print self
.name
, ": processing task exit"
328 elif task
[0] == 'reload':
329 print self
.name
, ": processing task reload terminating and relaunching"
332 elif task
[0] == 'edit-iface':
333 print self
.name
, ": processing task edit-iface port=%s, old_net=%s, new_net=%s" % (task
[1], task
[2], task
[3])
334 self
.edit_iface(task
[1], task
[2], task
[3])
335 elif task
[0] == 'restore-iface':
336 print self
.name
, ": processing task restore-iface %s mac=%s" % (task
[1], task
[2])
337 self
.restore_iface(task
[1], task
[2])
338 elif task
[0] == 'new-ovsbridge':
339 print self
.name
, ": Creating compute OVS bridge"
340 self
.create_ovs_bridge()
342 elif task
[0] == 'new-vxlan':
343 print self
.name
, ": Creating vxlan tunnel=" + task
[1] + ", remote ip=" + task
[2]
344 self
.create_ovs_vxlan_tunnel(task
[1], task
[2])
346 elif task
[0] == 'del-ovsbridge':
347 print self
.name
, ": Deleting OVS bridge"
348 self
.delete_ovs_bridge()
350 elif task
[0] == 'del-vxlan':
351 print self
.name
, ": Deleting vxlan " + task
[1] + " tunnel"
352 self
.delete_ovs_vxlan_tunnel(task
[1])
354 elif task
[0] == 'create-ovs-bridge-port':
355 print self
.name
, ": Adding port ovim-" + task
[1] + " to OVS bridge"
356 self
.create_ovs_bridge_port(task
[1])
357 elif task
[0] == 'del-ovs-port':
358 self
.delete_bridge_port_attached_to_ovs(task
[1], task
[2])
360 print self
.name
, ": unknown task", task
362 def server_forceoff(self
, wait_until_finished
=False):
363 while len(self
.pending_terminate_server
)>0:
365 if self
.pending_terminate_server
[0][0]>now
:
366 if wait_until_finished
:
371 req
={'uuid':self
.pending_terminate_server
[0][1],
372 'action':{'terminate':'force'},
375 self
.action_on_server(req
)
376 self
.pending_terminate_server
.pop(0)
380 self
.server_forceoff(True)
381 if self
.localinfo_dirty
:
382 self
.save_localinfo()
384 self
.ssh_conn
.close()
385 except Exception as e
:
387 print self
.name
, ": terminate Exception:", text
388 print self
.name
, ": exit from host_thread"
390 def get_local_iface_name(self
, generic_name
):
391 if self
.hostinfo
!= None and "iface_names" in self
.hostinfo
and generic_name
in self
.hostinfo
["iface_names"]:
392 return self
.hostinfo
["iface_names"][generic_name
]
395 def create_xml_server(self
, server
, dev_list
, server_metadata
={}):
396 """Function that implements the generation of the VM XML definition.
397 Additional devices are in dev_list list
398 The main disk is upon dev_list[0]"""
400 #get if operating system is Windows
402 os_type
= server_metadata
.get('os_type', None)
403 if os_type
== None and 'metadata' in dev_list
[0]:
404 os_type
= dev_list
[0]['metadata'].get('os_type', None)
405 if os_type
!= None and os_type
.lower() == "windows":
407 #get type of hard disk bus
408 bus_ide
= True if windows_os
else False
409 bus
= server_metadata
.get('bus', None)
410 if bus
== None and 'metadata' in dev_list
[0]:
411 bus
= dev_list
[0]['metadata'].get('bus', None)
413 bus_ide
= True if bus
=='ide' else False
417 text
= "<domain type='kvm'>"
419 topo
= server_metadata
.get('topology', None)
420 if topo
== None and 'metadata' in dev_list
[0]:
421 topo
= dev_list
[0]['metadata'].get('topology', None)
423 name
= server
.get('name','') + "_" + server
['uuid']
424 name
= name
[:58] #qemu impose a length limit of 59 chars or not start. Using 58
425 text
+= self
.inc_tab() + "<name>" + name
+ "</name>"
427 text
+= self
.tab() + "<uuid>" + server
['uuid'] + "</uuid>"
430 if 'extended' in server
and server
['extended']!=None and 'numas' in server
['extended']:
431 numa
= server
['extended']['numas'][0]
434 memory
= int(numa
.get('memory',0))*1024*1024 #in KiB
436 memory
= int(server
['ram'])*1024;
438 if not self
.develop_mode
:
441 return -1, 'No memory assigned to instance'
443 text
+= self
.tab() + "<memory unit='KiB'>" +memory
+"</memory>"
444 text
+= self
.tab() + "<currentMemory unit='KiB'>" +memory
+ "</currentMemory>"
446 text
+= self
.tab()+'<memoryBacking>'+ \
447 self
.inc_tab() + '<hugepages/>'+ \
448 self
.dec_tab()+ '</memoryBacking>'
451 use_cpu_pinning
=False
452 vcpus
= int(server
.get("vcpus",0))
454 if 'cores-source' in numa
:
456 for index
in range(0, len(numa
['cores-source'])):
457 cpu_pinning
.append( [ numa
['cores-id'][index
], numa
['cores-source'][index
] ] )
459 if 'threads-source' in numa
:
461 for index
in range(0, len(numa
['threads-source'])):
462 cpu_pinning
.append( [ numa
['threads-id'][index
], numa
['threads-source'][index
] ] )
464 if 'paired-threads-source' in numa
:
466 for index
in range(0, len(numa
['paired-threads-source'])):
467 cpu_pinning
.append( [numa
['paired-threads-id'][index
][0], numa
['paired-threads-source'][index
][0] ] )
468 cpu_pinning
.append( [numa
['paired-threads-id'][index
][1], numa
['paired-threads-source'][index
][1] ] )
471 if use_cpu_pinning
and not self
.develop_mode
:
472 text
+= self
.tab()+"<vcpu placement='static'>" +str(len(cpu_pinning
)) +"</vcpu>" + \
473 self
.tab()+'<cputune>'
475 for i
in range(0, len(cpu_pinning
)):
476 text
+= self
.tab() + "<vcpupin vcpu='" +str(cpu_pinning
[i
][0])+ "' cpuset='" +str(cpu_pinning
[i
][1]) +"'/>"
477 text
+= self
.dec_tab()+'</cputune>'+ \
478 self
.tab() + '<numatune>' +\
479 self
.inc_tab() + "<memory mode='strict' nodeset='" +str(numa
['source'])+ "'/>" +\
480 self
.dec_tab() + '</numatune>'
483 return -1, "Instance without number of cpus"
484 text
+= self
.tab()+"<vcpu>" + str(vcpus
) + "</vcpu>"
489 if dev
['type']=='cdrom' :
492 text
+= self
.tab()+ '<os>' + \
493 self
.inc_tab() + "<type arch='x86_64' machine='pc'>hvm</type>"
495 text
+= self
.tab() + "<boot dev='cdrom'/>"
496 text
+= self
.tab() + "<boot dev='hd'/>" + \
497 self
.dec_tab()+'</os>'
499 text
+= self
.tab()+'<features>'+\
500 self
.inc_tab()+'<acpi/>' +\
501 self
.tab()+'<apic/>' +\
502 self
.tab()+'<pae/>'+ \
503 self
.dec_tab() +'</features>'
504 if windows_os
or topo
=="oneSocket":
505 text
+= self
.tab() + "<cpu mode='host-model'> <topology sockets='1' cores='%d' threads='1' /> </cpu>"% vcpus
507 text
+= self
.tab() + "<cpu mode='host-model'></cpu>"
508 text
+= self
.tab() + "<clock offset='utc'/>" +\
509 self
.tab() + "<on_poweroff>preserve</on_poweroff>" + \
510 self
.tab() + "<on_reboot>restart</on_reboot>" + \
511 self
.tab() + "<on_crash>restart</on_crash>"
512 text
+= self
.tab() + "<devices>" + \
513 self
.inc_tab() + "<emulator>/usr/libexec/qemu-kvm</emulator>" + \
514 self
.tab() + "<serial type='pty'>" +\
515 self
.inc_tab() + "<target port='0'/>" + \
516 self
.dec_tab() + "</serial>" +\
517 self
.tab() + "<console type='pty'>" + \
518 self
.inc_tab()+ "<target type='serial' port='0'/>" + \
519 self
.dec_tab()+'</console>'
521 text
+= self
.tab() + "<controller type='usb' index='0'/>" + \
522 self
.tab() + "<controller type='ide' index='0'/>" + \
523 self
.tab() + "<input type='mouse' bus='ps2'/>" + \
524 self
.tab() + "<sound model='ich6'/>" + \
525 self
.tab() + "<video>" + \
526 self
.inc_tab() + "<model type='cirrus' vram='9216' heads='1'/>" + \
527 self
.dec_tab() + "</video>" + \
528 self
.tab() + "<memballoon model='virtio'/>" + \
529 self
.tab() + "<input type='tablet' bus='usb'/>" #TODO revisar
531 #> self.tab()+'<alias name=\'hostdev0\'/>\n' +\
532 #> self.dec_tab()+'</hostdev>\n' +\
533 #> self.tab()+'<input type=\'tablet\' bus=\'usb\'/>\n'
535 text
+= self
.tab() + "<graphics type='vnc' port='-1' autoport='yes'/>"
537 #If image contains 'GRAPH' include graphics
538 #if 'GRAPH' in image:
539 text
+= self
.tab() + "<graphics type='vnc' port='-1' autoport='yes' listen='0.0.0.0'>" +\
540 self
.inc_tab() + "<listen type='address' address='0.0.0.0'/>" +\
541 self
.dec_tab() + "</graphics>"
545 bus_ide_dev
= bus_ide
546 if dev
['type']=='cdrom' or dev
['type']=='disk':
547 if dev
['type']=='cdrom':
549 text
+= self
.tab() + "<disk type='file' device='"+dev
['type']+"'>"
550 if 'file format' in dev
:
551 text
+= self
.inc_tab() + "<driver name='qemu' type='" +dev
['file format']+ "' cache='writethrough'/>"
552 if 'source file' in dev
:
553 text
+= self
.tab() + "<source file='" +dev
['source file']+ "'/>"
554 #elif v['type'] == 'block':
555 # text += self.tab() + "<source dev='" + v['source'] + "'/>"
557 # return -1, 'Unknown disk type ' + v['type']
558 vpci
= dev
.get('vpci',None)
560 vpci
= dev
['metadata'].get('vpci',None)
561 text
+= self
.pci2xml(vpci
)
564 text
+= self
.tab() + "<target dev='hd" +vd_index
+ "' bus='ide'/>" #TODO allows several type of disks
566 text
+= self
.tab() + "<target dev='vd" +vd_index
+ "' bus='virtio'/>"
567 text
+= self
.dec_tab() + '</disk>'
568 vd_index
= chr(ord(vd_index
)+1)
569 elif dev
['type']=='xml':
570 dev_text
= dev
['xml']
572 dev_text
= dev_text
.replace('__vpci__', dev
['vpci'])
573 if 'source file' in dev
:
574 dev_text
= dev_text
.replace('__file__', dev
['source file'])
575 if 'file format' in dev
:
576 dev_text
= dev_text
.replace('__format__', dev
['source file'])
577 if '__dev__' in dev_text
:
578 dev_text
= dev_text
.replace('__dev__', vd_index
)
579 vd_index
= chr(ord(vd_index
)+1)
582 return -1, 'Unknown device type ' + dev
['type']
585 bridge_interfaces
= server
.get('networks', [])
586 for v
in bridge_interfaces
:
588 self
.db_lock
.acquire()
589 result
, content
= self
.db
.get_table(FROM
='nets', SELECT
=('provider',),WHERE
={'uuid':v
['net_id']} )
590 self
.db_lock
.release()
592 print "create_xml_server ERROR getting nets",result
, content
594 #ALF: Allow by the moment the 'default' bridge net because is confortable for provide internet to VM
595 #I know it is not secure
596 #for v in sorted(desc['network interfaces'].itervalues()):
597 model
= v
.get("model", None)
598 if content
[0]['provider']=='default':
599 text
+= self
.tab() + "<interface type='network'>" + \
600 self
.inc_tab() + "<source network='" +content
[0]['provider']+ "'/>"
601 elif content
[0]['provider'][0:7]=='macvtap':
602 text
+= self
.tab()+"<interface type='direct'>" + \
603 self
.inc_tab() + "<source dev='" + self
.get_local_iface_name(content
[0]['provider'][8:]) + "' mode='bridge'/>" + \
604 self
.tab() + "<target dev='macvtap0'/>"
606 text
+= self
.tab() + "<alias name='net" + str(net_nb
) + "'/>"
609 elif content
[0]['provider'][0:6]=='bridge':
610 text
+= self
.tab() + "<interface type='bridge'>" + \
611 self
.inc_tab()+"<source bridge='" +self
.get_local_iface_name(content
[0]['provider'][7:])+ "'/>"
613 text
+= self
.tab() + "<target dev='vnet" + str(net_nb
)+ "'/>" +\
614 self
.tab() + "<alias name='net" + str(net_nb
)+ "'/>"
617 elif content
[0]['provider'][0:3] == "OVS":
618 vlan
= content
[0]['provider'].replace('OVS:', '')
619 text
+= self
.tab() + "<interface type='bridge'>" + \
620 self
.inc_tab() + "<source bridge='ovim-" + vlan
+ "'/>"
622 return -1, 'Unknown Bridge net provider ' + content
[0]['provider']
624 text
+= self
.tab() + "<model type='" +model
+ "'/>"
625 if v
.get('mac_address', None) != None:
626 text
+= self
.tab() +"<mac address='" +v
['mac_address']+ "'/>"
627 text
+= self
.pci2xml(v
.get('vpci',None))
628 text
+= self
.dec_tab()+'</interface>'
632 interfaces
= numa
.get('interfaces', [])
636 if self
.develop_mode
: #map these interfaces to bridges
637 text
+= self
.tab() + "<interface type='bridge'>" + \
638 self
.inc_tab()+"<source bridge='" +self
.develop_bridge_iface
+ "'/>"
640 text
+= self
.tab() + "<target dev='vnet" + str(net_nb
)+ "'/>" +\
641 self
.tab() + "<alias name='net" + str(net_nb
)+ "'/>"
643 text
+= self
.tab() + "<model type='e1000'/>" #e1000 is more probable to be supported than 'virtio'
644 if v
.get('mac_address', None) != None:
645 text
+= self
.tab() +"<mac address='" +v
['mac_address']+ "'/>"
646 text
+= self
.pci2xml(v
.get('vpci',None))
647 text
+= self
.dec_tab()+'</interface>'
650 if v
['dedicated'] == 'yes': #passthrought
651 text
+= self
.tab() + "<hostdev mode='subsystem' type='pci' managed='yes'>" + \
652 self
.inc_tab() + "<source>"
654 text
+= self
.pci2xml(v
['source'])
655 text
+= self
.dec_tab()+'</source>'
656 text
+= self
.pci2xml(v
.get('vpci',None))
658 text
+= self
.tab() + "<alias name='hostdev" + str(net_nb
) + "'/>"
659 text
+= self
.dec_tab()+'</hostdev>'
661 else: #sriov_interfaces
662 #skip not connected interfaces
663 if v
.get("net_id") == None:
665 text
+= self
.tab() + "<interface type='hostdev' managed='yes'>"
667 if v
.get('mac_address', None) != None:
668 text
+= self
.tab() + "<mac address='" +v
['mac_address']+ "'/>"
669 text
+= self
.tab()+'<source>'
671 text
+= self
.pci2xml(v
['source'])
672 text
+= self
.dec_tab()+'</source>'
673 if v
.get('vlan',None) != None:
674 text
+= self
.tab() + "<vlan> <tag id='" + str(v
['vlan']) + "'/> </vlan>"
675 text
+= self
.pci2xml(v
.get('vpci',None))
677 text
+= self
.tab() + "<alias name='hostdev" + str(net_nb
) + "'/>"
678 text
+= self
.dec_tab()+'</interface>'
681 text
+= self
.dec_tab()+'</devices>'+\
682 self
.dec_tab()+'</domain>'
685 def pci2xml(self
, pci
):
686 '''from a pci format text XXXX:XX:XX.X generates the xml content of <address>
687 alows an empty pci text'''
690 first_part
= pci
.split(':')
691 second_part
= first_part
[2].split('.')
692 return self
.tab() + "<address type='pci' domain='0x" + first_part
[0] + \
693 "' bus='0x" + first_part
[1] + "' slot='0x" + second_part
[0] + \
694 "' function='0x" + second_part
[1] + "'/>"
697 """Return indentation according to xml_level"""
698 return "\n" + (' '*self
.xml_level
)
701 """Increment and return indentation according to xml_level"""
706 """Decrement and return indentation according to xml_level"""
710 def create_ovs_bridge(self
):
712 Create a bridge in compute OVS to allocate VMs
713 :return: True if success
715 command
= 'sudo ovs-vsctl --may-exist add-br br-int -- set Bridge br-int stp_enable=true'
716 print self
.name
, ': command:', command
717 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
718 content
= stdout
.read()
719 if len(content
) == 0:
724 def delete_port_to_ovs_bridge(self
, vlan
, net_uuid
):
726 Delete linux bridge port attched to a OVS bridge, if port is not free the port is not removed
727 :param vlan: vlan port id
728 :param net_uuid: network id
732 command
= 'sudo ovs-vsctl del-port br-int ovim-' + vlan
733 print self
.name
, ': command:', command
734 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
735 content
= stdout
.read()
736 if len(content
) == 0:
741 def is_port_free(self
, vlan
, net_uuid
):
743 Check if por is free before delete from the compute.
744 :param vlan: vlan port id
745 :param net_uuid: network id
746 :return: True if is not free
748 self
.db_lock
.acquire()
749 result
, content
= self
.db
.get_table(
750 FROM
='ports as p join instances as i on p.instance_id=i.uuid',
751 WHERE
={"i.host_id": self
.host_id
, 'p.type': 'instance:bridge', 'p.net_id': net_uuid
}
753 self
.db_lock
.release()
760 def add_port_to_ovs_bridge(self
, vlan
):
762 Add a bridge linux as a port to a OVS bridge and set a vlan for an specific linux bridge
763 :param vlan: vlan port id
766 command
= 'sudo ovs-vsctl add-port br-int ovim-' + vlan
+ ' tag=' + vlan
767 print self
.name
, ': command:', command
768 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
769 content
= stdout
.read()
770 if len(content
) == 0:
775 def delete_bridge_port_attached_to_ovs(self
, vlan
, net_uuid
):
777 Delete from an existing OVS bridge a linux bridge port attached and the linux bridge itself.
780 :return: True if success
782 if not self
.is_port_free(vlan
, net_uuid
):
784 self
.delete_port_to_ovs_bridge(vlan
, net_uuid
)
785 self
.delete_linux_bridge(vlan
)
788 def delete_linux_bridge(self
, vlan
):
790 Delete a linux bridge in a scpecific compute.
791 :param vlan: vlan port id
792 :return: True if success
794 command
= 'sudo ifconfig ovim-' + vlan
+ ' down && sudo brctl delbr ovim-' + vlan
795 print self
.name
, ': command:', command
796 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
797 content
= stdout
.read()
798 if len(content
) == 0:
803 def create_ovs_bridge_port(self
, vlan
):
805 Generate a linux bridge and attache the port to a OVS bridge
806 :param vlan: vlan port id
809 self
.create_linux_bridge(vlan
)
810 self
.add_port_to_ovs_bridge(vlan
)
812 def create_linux_bridge(self
, vlan
):
814 Create a linux bridge with STP active
815 :param vlan: netowrk vlan id
818 command
= 'sudo brctl addbr ovim-' + vlan
+ ' && sudo ifconfig ovim-' + vlan
+ ' up'
819 print self
.name
, ': command:', command
820 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
821 content
= stdout
.read()
823 if len(content
) != 0:
826 command
= 'sudo brctl stp ovim-' + vlan
+ ' on'
827 print self
.name
, ': command:', command
828 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
829 content
= stdout
.read()
831 if len(content
) == 0:
836 def create_ovs_vxlan_tunnel(self
, vxlan_interface
, remote_ip
):
838 Create a vlxn tunnel between to computes with an OVS installed. STP is also active at port level
839 :param vxlan_interface: vlxan inteface name.
840 :param remote_ip: tunnel endpoint remote compute ip.
843 command
= 'sudo ovs-vsctl add-port br-int ' + vxlan_interface
+ \
844 ' -- set Interface ' + vxlan_interface
+ ' type=vxlan options:remote_ip=' + remote_ip
+ \
845 ' -- set Port ' + vxlan_interface
+ ' other_config:stp-path-cost=10'
846 print self
.name
, ': command:', command
847 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
848 content
= stdout
.read()
850 if len(content
) == 0:
855 def delete_ovs_vxlan_tunnel(self
, vxlan_interface
):
857 Delete a vlxan tunnel port from a OVS brdige.
858 :param vxlan_interface: vlxan name to be delete it.
859 :return: True if success.
861 command
= 'sudo ovs-vsctl del-port br-int ' + vxlan_interface
862 print self
.name
, ': command:', command
863 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
864 content
= stdout
.read()
866 if len(content
) == 0:
871 def delete_ovs_bridge(self
):
873 Delete a OVS bridge from a compute.
874 :return: True if success
876 command
= 'sudo ovs-vsctl del-br br-int'
877 print self
.name
, ': command:', command
878 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
879 content
= stdout
.read()
880 if len(content
) == 0:
885 def get_file_info(self
, path
):
886 command
= 'ls -lL --time-style=+%Y-%m-%dT%H:%M:%S ' + path
887 print self
.name
, ': command:', command
888 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
889 content
= stdout
.read()
890 if len(content
) == 0:
891 return None # file does not exist
893 return content
.split(" ") #(permission, 1, owner, group, size, date, file)
895 def qemu_get_info(self
, path
):
896 command
= 'qemu-img info ' + path
897 print self
.name
, ': command:', command
898 (_
, stdout
, stderr
) = self
.ssh_conn
.exec_command(command
)
899 content
= stdout
.read()
900 if len(content
) == 0:
901 error
= stderr
.read()
902 print self
.name
, ": get_qemu_info error ", error
903 raise paramiko
.ssh_exception
.SSHException("Error getting qemu_info: " + error
)
906 return yaml
.load(content
)
907 except yaml
.YAMLError
as exc
:
909 if hasattr(exc
, 'problem_mark'):
910 mark
= exc
.problem_mark
911 text
= " at position: (%s:%s)" % (mark
.line
+1, mark
.column
+1)
912 print self
.name
, ": get_qemu_info yaml format Exception", text
913 raise paramiko
.ssh_exception
.SSHException("Error getting qemu_info yaml format" + text
)
915 def qemu_change_backing(self
, inc_file
, new_backing_file
):
916 command
= 'qemu-img rebase -u -b ' + new_backing_file
+ ' ' + inc_file
917 print self
.name
, ': command:', command
918 (_
, _
, stderr
) = self
.ssh_conn
.exec_command(command
)
919 content
= stderr
.read()
920 if len(content
) == 0:
923 print self
.name
, ": qemu_change_backing error: ", content
926 def get_notused_filename(self
, proposed_name
, suffix
=''):
927 '''Look for a non existing file_name in the host
928 proposed_name: proposed file name, includes path
929 suffix: suffix to be added to the name, before the extention
931 extension
= proposed_name
.rfind(".")
932 slash
= proposed_name
.rfind("/")
933 if extension
< 0 or extension
< slash
: # no extension
934 extension
= len(proposed_name
)
935 target_name
= proposed_name
[:extension
] + suffix
+ proposed_name
[extension
:]
936 info
= self
.get_file_info(target_name
)
941 while info
is not None:
942 target_name
= proposed_name
[:extension
] + suffix
+ "-" + str(index
) + proposed_name
[extension
:]
944 info
= self
.get_file_info(target_name
)
947 def get_notused_path(self
, proposed_path
, suffix
=''):
948 '''Look for a non existing path at database for images
949 proposed_path: proposed file name, includes path
950 suffix: suffix to be added to the name, before the extention
952 extension
= proposed_path
.rfind(".")
954 extension
= len(proposed_path
)
956 target_path
= proposed_path
[:extension
] + suffix
+ proposed_path
[extension
:]
959 r
,_
=self
.db
.get_table(FROM
="images",WHERE
={"path":target_path
})
962 target_path
= proposed_path
[:extension
] + suffix
+ "-" + str(index
) + proposed_path
[extension
:]
966 def delete_file(self
, file_name
):
967 command
= 'rm -f '+file_name
968 print self
.name
, ': command:', command
969 (_
, _
, stderr
) = self
.ssh_conn
.exec_command(command
)
970 error_msg
= stderr
.read()
971 if len(error_msg
) > 0:
972 raise paramiko
.ssh_exception
.SSHException("Error deleting file: " + error_msg
)
974 def copy_file(self
, source
, destination
, perserve_time
=True):
975 if source
[0:4]=="http":
976 command
= "wget --no-verbose -O '{dst}' '{src}' 2>'{dst_result}' || cat '{dst_result}' >&2 && rm '{dst_result}'".format(
977 dst
=destination
, src
=source
, dst_result
=destination
+ ".result" )
979 command
= 'cp --no-preserve=mode'
981 command
+= ' --preserve=timestamps'
982 command
+= " '{}' '{}'".format(source
, destination
)
983 print self
.name
, ': command:', command
984 (_
, _
, stderr
) = self
.ssh_conn
.exec_command(command
)
985 error_msg
= stderr
.read()
986 if len(error_msg
) > 0:
987 raise paramiko
.ssh_exception
.SSHException("Error copying image to local host: " + error_msg
)
989 def copy_remote_file(self
, remote_file
, use_incremental
):
990 ''' Copy a file from the repository to local folder and recursively
991 copy the backing files in case the remote file is incremental
992 Read and/or modified self.localinfo['files'] that contain the
993 unmodified copies of images in the local path
995 remote_file: path of remote file
996 use_incremental: None (leave the decision to this function), True, False
998 local_file: name of local file
999 qemu_info: dict with quemu information of local file
1000 use_incremental_out: True, False; same as use_incremental, but if None a decision is taken
1003 use_incremental_out
= use_incremental
1004 new_backing_file
= None
1006 file_from_local
= True
1008 #in case incremental use is not decided, take the decision depending on the image
1009 #avoid the use of incremental if this image is already incremental
1010 if remote_file
[0:4] == "http":
1011 file_from_local
= False
1013 qemu_remote_info
= self
.qemu_get_info(remote_file
)
1014 if use_incremental_out
==None:
1015 use_incremental_out
= not ( file_from_local
and 'backing file' in qemu_remote_info
)
1016 #copy recursivelly the backing files
1017 if file_from_local
and 'backing file' in qemu_remote_info
:
1018 new_backing_file
, _
, _
= self
.copy_remote_file(qemu_remote_info
['backing file'], True)
1020 #check if remote file is present locally
1021 if use_incremental_out
and remote_file
in self
.localinfo
['files']:
1022 local_file
= self
.localinfo
['files'][remote_file
]
1023 local_file_info
= self
.get_file_info(local_file
)
1025 remote_file_info
= self
.get_file_info(remote_file
)
1026 if local_file_info
== None:
1028 elif file_from_local
and (local_file_info
[4]!=remote_file_info
[4] or local_file_info
[5]!=remote_file_info
[5]):
1029 #local copy of file not valid because date or size are different.
1030 #TODO DELETE local file if this file is not used by any active virtual machine
1032 self
.delete_file(local_file
)
1033 del self
.localinfo
['files'][remote_file
]
1037 else: #check that the local file has the same backing file, or there are not backing at all
1038 qemu_info
= self
.qemu_get_info(local_file
)
1039 if new_backing_file
!= qemu_info
.get('backing file'):
1043 if local_file
== None: #copy the file
1044 img_name
= remote_file
.split('/') [-1]
1045 img_local
= self
.image_path
+ '/' + img_name
1046 local_file
= self
.get_notused_filename(img_local
)
1047 self
.copy_file(remote_file
, local_file
, use_incremental_out
)
1049 if use_incremental_out
:
1050 self
.localinfo
['files'][remote_file
] = local_file
1051 if new_backing_file
:
1052 self
.qemu_change_backing(local_file
, new_backing_file
)
1053 qemu_info
= self
.qemu_get_info(local_file
)
1055 return local_file
, qemu_info
, use_incremental_out
1057 def launch_server(self
, conn
, server
, rebuild
=False, domain
=None):
1059 time
.sleep(random
.randint(20,150)) #sleep random timeto be make it a bit more real
1062 server_id
= server
['uuid']
1063 paused
= server
.get('paused','no')
1065 if domain
!=None and rebuild
==False:
1067 #self.server_status[server_id] = 'ACTIVE'
1070 self
.db_lock
.acquire()
1071 result
, server_data
= self
.db
.get_instance(server_id
)
1072 self
.db_lock
.release()
1074 print self
.name
, ": launch_server ERROR getting server from DB",result
, server_data
1075 return result
, server_data
1077 #0: get image metadata
1078 server_metadata
= server
.get('metadata', {})
1079 use_incremental
= None
1081 if "use_incremental" in server_metadata
:
1082 use_incremental
= False if server_metadata
["use_incremental"]=="no" else True
1084 server_host_files
= self
.localinfo
['server_files'].get( server
['uuid'], {})
1086 #delete previous incremental files
1087 for file_
in server_host_files
.values():
1088 self
.delete_file(file_
['source file'] )
1089 server_host_files
={}
1091 #1: obtain aditional devices (disks)
1092 #Put as first device the main disk
1093 devices
= [ {"type":"disk", "image_id":server
['image_id'], "vpci":server_metadata
.get('vpci', None) } ]
1094 if 'extended' in server_data
and server_data
['extended']!=None and "devices" in server_data
['extended']:
1095 devices
+= server_data
['extended']['devices']
1098 if dev
['image_id'] == None:
1101 self
.db_lock
.acquire()
1102 result
, content
= self
.db
.get_table(FROM
='images', SELECT
=('path','metadata'),WHERE
={'uuid':dev
['image_id']} )
1103 self
.db_lock
.release()
1105 error_text
= "ERROR", result
, content
, "when getting image", dev
['image_id']
1106 print self
.name
, ": launch_server", error_text
1107 return -1, error_text
1108 if content
[0]['metadata'] is not None:
1109 dev
['metadata'] = json
.loads(content
[0]['metadata'])
1111 dev
['metadata'] = {}
1113 if dev
['image_id'] in server_host_files
:
1114 dev
['source file'] = server_host_files
[ dev
['image_id'] ] ['source file'] #local path
1115 dev
['file format'] = server_host_files
[ dev
['image_id'] ] ['file format'] # raw or qcow2
1118 #2: copy image to host
1119 remote_file
= content
[0]['path']
1120 use_incremental_image
= use_incremental
1121 if dev
['metadata'].get("use_incremental") == "no":
1122 use_incremental_image
= False
1123 local_file
, qemu_info
, use_incremental_image
= self
.copy_remote_file(remote_file
, use_incremental_image
)
1125 #create incremental image
1126 if use_incremental_image
:
1127 local_file_inc
= self
.get_notused_filename(local_file
, '.inc')
1128 command
= 'qemu-img create -f qcow2 '+local_file_inc
+ ' -o backing_file='+ local_file
1129 print 'command:', command
1130 (_
, _
, stderr
) = self
.ssh_conn
.exec_command(command
)
1131 error_msg
= stderr
.read()
1132 if len(error_msg
) > 0:
1133 raise paramiko
.ssh_exception
.SSHException("Error creating incremental file: " + error_msg
)
1134 local_file
= local_file_inc
1135 qemu_info
= {'file format':'qcow2'}
1137 server_host_files
[ dev
['image_id'] ] = {'source file': local_file
, 'file format': qemu_info
['file format']}
1139 dev
['source file'] = local_file
1140 dev
['file format'] = qemu_info
['file format']
1142 self
.localinfo
['server_files'][ server
['uuid'] ] = server_host_files
1143 self
.localinfo_dirty
= True
1146 result
, xml
= self
.create_xml_server(server_data
, devices
, server_metadata
) #local_file
1148 print self
.name
, ": create xml server error:", xml
1150 print self
.name
, ": create xml:", xml
1151 atribute
= host_thread
.lvirt_module
.VIR_DOMAIN_START_PAUSED
if paused
== "yes" else 0
1153 if not rebuild
: #ensures that any pending destroying server is done
1154 self
.server_forceoff(True)
1155 #print self.name, ": launching instance" #, xml
1156 conn
.createXML(xml
, atribute
)
1157 #self.server_status[server_id] = 'PAUSED' if paused == "yes" else 'ACTIVE'
1161 except paramiko
.ssh_exception
.SSHException
as e
:
1163 print self
.name
, ": launch_server(%s) ssh Exception: %s" %(server_id
, text
)
1164 if "SSH session not active" in text
:
1166 except host_thread
.lvirt_module
.libvirtError
as e
:
1167 text
= e
.get_error_message()
1168 print self
.name
, ": launch_server(%s) libvirt Exception: %s" %(server_id
, text
)
1169 except Exception as e
:
1171 print self
.name
, ": launch_server(%s) Exception: %s" %(server_id
, text
)
1174 def update_servers_status(self
):
1176 # VIR_DOMAIN_NOSTATE = 0
1177 # VIR_DOMAIN_RUNNING = 1
1178 # VIR_DOMAIN_BLOCKED = 2
1179 # VIR_DOMAIN_PAUSED = 3
1180 # VIR_DOMAIN_SHUTDOWN = 4
1181 # VIR_DOMAIN_SHUTOFF = 5
1182 # VIR_DOMAIN_CRASHED = 6
1183 # VIR_DOMAIN_PMSUSPENDED = 7 #TODO suspended
1185 if self
.test
or len(self
.server_status
)==0:
1189 conn
= host_thread
.lvirt_module
.open("qemu+ssh://"+self
.user
+"@"+self
.host
+"/system")
1190 domains
= conn
.listAllDomains()
1192 for domain
in domains
:
1193 uuid
= domain
.UUIDString() ;
1194 libvirt_status
= domain
.state()
1195 #print libvirt_status
1196 if libvirt_status
[0] == host_thread
.lvirt_module
.VIR_DOMAIN_RUNNING
or libvirt_status
[0] == host_thread
.lvirt_module
.VIR_DOMAIN_SHUTDOWN
:
1197 new_status
= "ACTIVE"
1198 elif libvirt_status
[0] == host_thread
.lvirt_module
.VIR_DOMAIN_PAUSED
:
1199 new_status
= "PAUSED"
1200 elif libvirt_status
[0] == host_thread
.lvirt_module
.VIR_DOMAIN_SHUTOFF
:
1201 new_status
= "INACTIVE"
1202 elif libvirt_status
[0] == host_thread
.lvirt_module
.VIR_DOMAIN_CRASHED
:
1203 new_status
= "ERROR"
1206 domain_dict
[uuid
] = new_status
1208 except host_thread
.lvirt_module
.libvirtError
as e
:
1209 print self
.name
, ": get_state() Exception '", e
.get_error_message()
1212 for server_id
, current_status
in self
.server_status
.iteritems():
1214 if server_id
in domain_dict
:
1215 new_status
= domain_dict
[server_id
]
1217 new_status
= "INACTIVE"
1219 if new_status
== None or new_status
== current_status
:
1221 if new_status
== 'INACTIVE' and current_status
== 'ERROR':
1222 continue #keep ERROR status, because obviously this machine is not running
1224 print self
.name
, ": server ", server_id
, "status change from ", current_status
, "to", new_status
1225 STATUS
={'progress':100, 'status':new_status
}
1226 if new_status
== 'ERROR':
1227 STATUS
['last_error'] = 'machine has crashed'
1228 self
.db_lock
.acquire()
1229 r
,_
= self
.db
.update_rows('instances', STATUS
, {'uuid':server_id
}, log
=False)
1230 self
.db_lock
.release()
1232 self
.server_status
[server_id
] = new_status
1234 def action_on_server(self
, req
, last_retry
=True):
1235 '''Perform an action on a req
1237 req: dictionary that contain:
1238 server properties: 'uuid','name','tenant_id','status'
1240 host properties: 'user', 'ip_name'
1241 return (error, text)
1242 0: No error. VM is updated to new state,
1243 -1: Invalid action, as trying to pause a PAUSED VM
1244 -2: Error accessing host
1246 -4: Error at DB access
1247 -5: Error while trying to perform action. VM is updated to ERROR
1249 server_id
= req
['uuid']
1252 old_status
= req
['status']
1256 if 'terminate' in req
['action']:
1257 new_status
= 'deleted'
1258 elif 'shutoff' in req
['action'] or 'shutdown' in req
['action'] or 'forceOff' in req
['action']:
1259 if req
['status']!='ERROR':
1261 new_status
= 'INACTIVE'
1262 elif 'start' in req
['action'] and req
['status']!='ERROR': new_status
= 'ACTIVE'
1263 elif 'resume' in req
['action'] and req
['status']!='ERROR' and req
['status']!='INACTIVE' : new_status
= 'ACTIVE'
1264 elif 'pause' in req
['action'] and req
['status']!='ERROR': new_status
= 'PAUSED'
1265 elif 'reboot' in req
['action'] and req
['status']!='ERROR': new_status
= 'ACTIVE'
1266 elif 'rebuild' in req
['action']:
1267 time
.sleep(random
.randint(20,150))
1268 new_status
= 'ACTIVE'
1269 elif 'createImage' in req
['action']:
1271 self
.create_image(None, req
)
1274 conn
= host_thread
.lvirt_module
.open("qemu+ssh://"+self
.user
+"@"+self
.host
+"/system")
1276 dom
= conn
.lookupByUUIDString(server_id
)
1277 except host_thread
.lvirt_module
.libvirtError
as e
:
1278 text
= e
.get_error_message()
1279 if 'LookupByUUIDString' in text
or 'Domain not found' in text
or 'No existe un dominio coincidente' in text
:
1282 print self
.name
, ": action_on_server(",server_id
,") libvirt exception:", text
1285 if 'forceOff' in req
['action']:
1287 print self
.name
, ": action_on_server(",server_id
,") domain not running"
1290 print self
.name
, ": sending DESTROY to server", server_id
1292 except Exception as e
:
1293 if "domain is not running" not in e
.get_error_message():
1294 print self
.name
, ": action_on_server(",server_id
,") Exception while sending force off:", e
.get_error_message()
1295 last_error
= 'action_on_server Exception while destroy: ' + e
.get_error_message()
1296 new_status
= 'ERROR'
1298 elif 'terminate' in req
['action']:
1300 print self
.name
, ": action_on_server(",server_id
,") domain not running"
1301 new_status
= 'deleted'
1304 if req
['action']['terminate'] == 'force':
1305 print self
.name
, ": sending DESTROY to server", server_id
1307 new_status
= 'deleted'
1309 print self
.name
, ": sending SHUTDOWN to server", server_id
1311 self
.pending_terminate_server
.append( (time
.time()+10,server_id
) )
1312 except Exception as e
:
1313 print self
.name
, ": action_on_server(",server_id
,") Exception while destroy:", e
.get_error_message()
1314 last_error
= 'action_on_server Exception while destroy: ' + e
.get_error_message()
1315 new_status
= 'ERROR'
1316 if "domain is not running" in e
.get_error_message():
1319 new_status
= 'deleted'
1321 print self
.name
, ": action_on_server(",server_id
,") Exception while undefine:", e
.get_error_message()
1322 last_error
= 'action_on_server Exception2 while undefine:', e
.get_error_message()
1323 #Exception: 'virDomainDetachDevice() failed'
1324 if new_status
=='deleted':
1325 if server_id
in self
.server_status
:
1326 del self
.server_status
[server_id
]
1327 if req
['uuid'] in self
.localinfo
['server_files']:
1328 for file_
in self
.localinfo
['server_files'][ req
['uuid'] ].values():
1330 self
.delete_file(file_
['source file'])
1333 del self
.localinfo
['server_files'][ req
['uuid'] ]
1334 self
.localinfo_dirty
= True
1336 elif 'shutoff' in req
['action'] or 'shutdown' in req
['action']:
1339 print self
.name
, ": action_on_server(",server_id
,") domain not running"
1342 # new_status = 'INACTIVE'
1343 #TODO: check status for changing at database
1344 except Exception as e
:
1345 new_status
= 'ERROR'
1346 print self
.name
, ": action_on_server(",server_id
,") Exception while shutdown:", e
.get_error_message()
1347 last_error
= 'action_on_server Exception while shutdown: ' + e
.get_error_message()
1349 elif 'rebuild' in req
['action']:
1352 r
= self
.launch_server(conn
, req
, True, None)
1354 new_status
= 'ERROR'
1357 new_status
= 'ACTIVE'
1358 elif 'start' in req
['action']:
1359 # The instance is only create in DB but not yet at libvirt domain, needs to be create
1360 rebuild
= True if req
['action']['start'] == 'rebuild' else False
1361 r
= self
.launch_server(conn
, req
, rebuild
, dom
)
1363 new_status
= 'ERROR'
1366 new_status
= 'ACTIVE'
1368 elif 'resume' in req
['action']:
1374 # new_status = 'ACTIVE'
1375 except Exception as e
:
1376 print self
.name
, ": action_on_server(",server_id
,") Exception while resume:", e
.get_error_message()
1378 elif 'pause' in req
['action']:
1384 # new_status = 'PAUSED'
1385 except Exception as e
:
1386 print self
.name
, ": action_on_server(",server_id
,") Exception while pause:", e
.get_error_message()
1388 elif 'reboot' in req
['action']:
1394 print self
.name
, ": action_on_server(",server_id
,") reboot:"
1395 #new_status = 'ACTIVE'
1396 except Exception as e
:
1397 print self
.name
, ": action_on_server(",server_id
,") Exception while reboot:", e
.get_error_message()
1398 elif 'createImage' in req
['action']:
1399 self
.create_image(dom
, req
)
1403 except host_thread
.lvirt_module
.libvirtError
as e
:
1404 if conn
is not None: conn
.close()
1405 text
= e
.get_error_message()
1406 new_status
= "ERROR"
1408 print self
.name
, ": action_on_server(",server_id
,") Exception '", text
1409 if 'LookupByUUIDString' in text
or 'Domain not found' in text
or 'No existe un dominio coincidente' in text
:
1410 print self
.name
, ": action_on_server(",server_id
,") Exception removed from host"
1411 #end of if self.test
1412 if new_status
== None:
1415 print self
.name
, ": action_on_server(",server_id
,") new status", new_status
, last_error
1416 UPDATE
= {'progress':100, 'status':new_status
}
1418 if new_status
=='ERROR':
1419 if not last_retry
: #if there will be another retry do not update database
1421 elif 'terminate' in req
['action']:
1422 #PUT a log in the database
1423 print self
.name
, ": PANIC deleting server", server_id
, last_error
1424 self
.db_lock
.acquire()
1425 self
.db
.new_row('logs',
1426 {'uuid':server_id
, 'tenant_id':req
['tenant_id'], 'related':'instances','level':'panic',
1427 'description':'PANIC deleting server from host '+self
.name
+': '+last_error
}
1429 self
.db_lock
.release()
1430 if server_id
in self
.server_status
:
1431 del self
.server_status
[server_id
]
1434 UPDATE
['last_error'] = last_error
1435 if new_status
!= 'deleted' and (new_status
!= old_status
or new_status
== 'ERROR') :
1436 self
.db_lock
.acquire()
1437 self
.db
.update_rows('instances', UPDATE
, {'uuid':server_id
}, log
=True)
1438 self
.server_status
[server_id
] = new_status
1439 self
.db_lock
.release()
1440 if new_status
== 'ERROR':
1445 def restore_iface(self
, name
, mac
, lib_conn
=None):
1446 ''' make an ifdown, ifup to restore default parameter of na interface
1448 mac: mac address of the interface
1449 lib_conn: connection to the libvirt, if None a new connection is created
1450 Return 0,None if ok, -1,text if fails
1456 print self
.name
, ": restore_iface '%s' %s" % (name
, mac
)
1460 conn
= host_thread
.lvirt_module
.open("qemu+ssh://"+self
.user
+"@"+self
.host
+"/system")
1464 #wait to the pending VM deletion
1465 #TODO.Revise self.server_forceoff(True)
1467 iface
= conn
.interfaceLookupByMACString(mac
)
1470 print self
.name
, ": restore_iface '%s' %s" % (name
, mac
)
1471 except host_thread
.lvirt_module
.libvirtError
as e
:
1472 error_text
= e
.get_error_message()
1473 print self
.name
, ": restore_iface '%s' '%s' libvirt exception: %s" %(name
, mac
, error_text
)
1476 if lib_conn
is None and conn
is not None:
1478 return ret
, error_text
1481 def create_image(self
,dom
, req
):
1483 if 'path' in req
['action']['createImage']:
1484 file_dst
= req
['action']['createImage']['path']
1486 createImage
=req
['action']['createImage']
1487 img_name
= createImage
['source']['path']
1488 index
=img_name
.rfind('/')
1489 file_dst
= self
.get_notused_path(img_name
[:index
+1] + createImage
['name'] + '.qcow2')
1490 image_status
='ACTIVE'
1494 server_id
= req
['uuid']
1495 createImage
=req
['action']['createImage']
1496 file_orig
= self
.localinfo
['server_files'][server_id
] [ createImage
['source']['image_id'] ] ['source file']
1497 if 'path' in req
['action']['createImage']:
1498 file_dst
= req
['action']['createImage']['path']
1500 img_name
= createImage
['source']['path']
1501 index
=img_name
.rfind('/')
1502 file_dst
= self
.get_notused_filename(img_name
[:index
+1] + createImage
['name'] + '.qcow2')
1504 self
.copy_file(file_orig
, file_dst
)
1505 qemu_info
= self
.qemu_get_info(file_orig
)
1506 if 'backing file' in qemu_info
:
1507 for k
,v
in self
.localinfo
['files'].items():
1508 if v
==qemu_info
['backing file']:
1509 self
.qemu_change_backing(file_dst
, k
)
1511 image_status
='ACTIVE'
1513 except paramiko
.ssh_exception
.SSHException
as e
:
1514 image_status
='ERROR'
1515 error_text
= e
.args
[0]
1516 print self
.name
, "': create_image(",server_id
,") ssh Exception:", error_text
1517 if "SSH session not active" in error_text
and retry
==0:
1519 except Exception as e
:
1520 image_status
='ERROR'
1522 print self
.name
, "': create_image(",server_id
,") Exception:", error_text
1524 #TODO insert a last_error at database
1525 self
.db_lock
.acquire()
1526 self
.db
.update_rows('images', {'status':image_status
, 'progress': 100, 'path':file_dst
},
1527 {'uuid':req
['new_image']['uuid']}, log
=True)
1528 self
.db_lock
.release()
1530 def edit_iface(self
, port_id
, old_net
, new_net
):
1531 #This action imply remove and insert interface to put proper parameters
1536 self
.db_lock
.acquire()
1537 r
,c
= self
.db
.get_table(FROM
='ports as p join resources_port as rp on p.uuid=rp.port_id',
1538 WHERE
={'port_id': port_id
})
1539 self
.db_lock
.release()
1541 print self
.name
, ": edit_iface(",port_id
,") DDBB error:", c
1544 print self
.name
, ": edit_iface(",port_id
,") por not found"
1547 if port
["model"]!="VF":
1548 print self
.name
, ": edit_iface(",port_id
,") ERROR model must be VF"
1550 #create xml detach file
1553 xml
.append("<interface type='hostdev' managed='yes'>")
1554 xml
.append(" <mac address='" +port
['mac']+ "'/>")
1555 xml
.append(" <source>"+ self
.pci2xml(port
['pci'])+"\n </source>")
1556 xml
.append('</interface>')
1561 conn
= host_thread
.lvirt_module
.open("qemu+ssh://"+self
.user
+"@"+self
.host
+"/system")
1562 dom
= conn
.lookupByUUIDString(port
["instance_id"])
1565 print self
.name
, ": edit_iface detaching SRIOV interface", text
1566 dom
.detachDeviceFlags(text
, flags
=host_thread
.lvirt_module
.VIR_DOMAIN_AFFECT_LIVE
)
1568 xml
[-1] =" <vlan> <tag id='" + str(port
['vlan']) + "'/> </vlan>"
1570 xml
.append(self
.pci2xml(port
.get('vpci',None)) )
1571 xml
.append('</interface>')
1573 print self
.name
, ": edit_iface attaching SRIOV interface", text
1574 dom
.attachDeviceFlags(text
, flags
=host_thread
.lvirt_module
.VIR_DOMAIN_AFFECT_LIVE
)
1576 except host_thread
.lvirt_module
.libvirtError
as e
:
1577 text
= e
.get_error_message()
1578 print self
.name
, ": edit_iface(",port
["instance_id"],") libvirt exception:", text
1581 if conn
is not None: conn
.close()
1584 def create_server(server
, db
, db_lock
, only_of_ports
):
1591 # host_id = server.get('host_id', None)
1592 extended
= server
.get('extended', None)
1594 # print '----------------------'
1595 # print json.dumps(extended, indent=4)
1598 requirements
['numa']={'memory':0, 'proc_req_type': 'threads', 'proc_req_nb':0, 'port_list':[], 'sriov_list':[]}
1599 requirements
['ram'] = server
['flavor'].get('ram', 0)
1600 if requirements
['ram']== None:
1601 requirements
['ram'] = 0
1602 requirements
['vcpus'] = server
['flavor'].get('vcpus', 0)
1603 if requirements
['vcpus']== None:
1604 requirements
['vcpus'] = 0
1605 #If extended is not defined get requirements from flavor
1606 if extended
is None:
1607 #If extended is defined in flavor convert to dictionary and use it
1608 if 'extended' in server
['flavor'] and server
['flavor']['extended'] != None:
1609 json_acceptable_string
= server
['flavor']['extended'].replace("'", "\"")
1610 extended
= json
.loads(json_acceptable_string
)
1613 #print json.dumps(extended, indent=4)
1615 #For simplicity only one numa VM are supported in the initial implementation
1616 if extended
!= None:
1617 numas
= extended
.get('numas', [])
1619 return (-2, "Multi-NUMA VMs are not supported yet")
1621 # return (-1, "At least one numa must be specified")
1623 #a for loop is used in order to be ready to multi-NUMA VMs
1627 numa_req
['memory'] = numa
.get('memory', 0)
1629 numa_req
['proc_req_nb'] = numa
['cores'] #number of cores or threads to be reserved
1630 numa_req
['proc_req_type'] = 'cores' #indicates whether cores or threads must be reserved
1631 numa_req
['proc_req_list'] = numa
.get('cores-id', None) #list of ids to be assigned to the cores or threads
1632 elif 'paired-threads' in numa
:
1633 numa_req
['proc_req_nb'] = numa
['paired-threads']
1634 numa_req
['proc_req_type'] = 'paired-threads'
1635 numa_req
['proc_req_list'] = numa
.get('paired-threads-id', None)
1636 elif 'threads' in numa
:
1637 numa_req
['proc_req_nb'] = numa
['threads']
1638 numa_req
['proc_req_type'] = 'threads'
1639 numa_req
['proc_req_list'] = numa
.get('threads-id', None)
1641 numa_req
['proc_req_nb'] = 0 # by default
1642 numa_req
['proc_req_type'] = 'threads'
1646 #Generate a list of sriov and another for physical interfaces
1647 interfaces
= numa
.get('interfaces', [])
1650 for iface
in interfaces
:
1651 iface
['bandwidth'] = int(iface
['bandwidth'])
1652 if iface
['dedicated'][:3]=='yes':
1653 port_list
.append(iface
)
1655 sriov_list
.append(iface
)
1657 #Save lists ordered from more restrictive to less bw requirements
1658 numa_req
['sriov_list'] = sorted(sriov_list
, key
=lambda k
: k
['bandwidth'], reverse
=True)
1659 numa_req
['port_list'] = sorted(port_list
, key
=lambda k
: k
['bandwidth'], reverse
=True)
1662 request
.append(numa_req
)
1664 # print "----------\n"+json.dumps(request[0], indent=4)
1665 # print '----------\n\n'
1667 #Search in db for an appropriate numa for each requested numa
1668 #at the moment multi-NUMA VMs are not supported
1670 requirements
['numa'].update(request
[0])
1671 if requirements
['numa']['memory']>0:
1672 requirements
['ram']=0 #By the moment I make incompatible ask for both Huge and non huge pages memory
1673 elif requirements
['ram']==0:
1674 return (-1, "Memory information not set neither at extended field not at ram")
1675 if requirements
['numa']['proc_req_nb']>0:
1676 requirements
['vcpus']=0 #By the moment I make incompatible ask for both Isolated and non isolated cpus
1677 elif requirements
['vcpus']==0:
1678 return (-1, "Processor information not set neither at extended field not at vcpus")
1682 result
, content
= db
.get_numas(requirements
, server
.get('host_id', None), only_of_ports
)
1686 return (-1, content
)
1688 numa_id
= content
['numa_id']
1689 host_id
= content
['host_id']
1691 #obtain threads_id and calculate pinning
1694 if requirements
['numa']['proc_req_nb']>0:
1696 result
, content
= db
.get_table(FROM
='resources_core',
1697 SELECT
=('id','core_id','thread_id'),
1698 WHERE
={'numa_id':numa_id
,'instance_id': None, 'status':'ok'} )
1704 #convert rows to a dictionary indexed by core_id
1707 if not row
['core_id'] in cores_dict
:
1708 cores_dict
[row
['core_id']] = []
1709 cores_dict
[row
['core_id']].append([row
['thread_id'],row
['id']])
1711 #In case full cores are requested
1713 if requirements
['numa']['proc_req_type'] == 'cores':
1714 #Get/create the list of the vcpu_ids
1715 vcpu_id_list
= requirements
['numa']['proc_req_list']
1716 if vcpu_id_list
== None:
1717 vcpu_id_list
= range(0,int(requirements
['numa']['proc_req_nb']))
1719 for threads
in cores_dict
.itervalues():
1721 if len(threads
) != 2:
1724 #set pinning for the first thread
1725 cpu_pinning
.append( [ vcpu_id_list
.pop(0), threads
[0][0], threads
[0][1] ] )
1727 #reserve so it is not used the second thread
1728 reserved_threads
.append(threads
[1][1])
1730 if len(vcpu_id_list
) == 0:
1733 #In case paired threads are requested
1734 elif requirements
['numa']['proc_req_type'] == 'paired-threads':
1736 #Get/create the list of the vcpu_ids
1737 if requirements
['numa']['proc_req_list'] != None:
1739 for pair
in requirements
['numa']['proc_req_list']:
1741 return -1, "Field paired-threads-id not properly specified"
1743 vcpu_id_list
.append(pair
[0])
1744 vcpu_id_list
.append(pair
[1])
1746 vcpu_id_list
= range(0,2*int(requirements
['numa']['proc_req_nb']))
1748 for threads
in cores_dict
.itervalues():
1750 if len(threads
) != 2:
1752 #set pinning for the first thread
1753 cpu_pinning
.append([vcpu_id_list
.pop(0), threads
[0][0], threads
[0][1]])
1755 #set pinning for the second thread
1756 cpu_pinning
.append([vcpu_id_list
.pop(0), threads
[1][0], threads
[1][1]])
1758 if len(vcpu_id_list
) == 0:
1761 #In case normal threads are requested
1762 elif requirements
['numa']['proc_req_type'] == 'threads':
1763 #Get/create the list of the vcpu_ids
1764 vcpu_id_list
= requirements
['numa']['proc_req_list']
1765 if vcpu_id_list
== None:
1766 vcpu_id_list
= range(0,int(requirements
['numa']['proc_req_nb']))
1768 for threads_index
in sorted(cores_dict
, key
=lambda k
: len(cores_dict
[k
])):
1769 threads
= cores_dict
[threads_index
]
1770 #set pinning for the first thread
1771 cpu_pinning
.append([vcpu_id_list
.pop(0), threads
[0][0], threads
[0][1]])
1773 #if exists, set pinning for the second thread
1774 if len(threads
) == 2 and len(vcpu_id_list
) != 0:
1775 cpu_pinning
.append([vcpu_id_list
.pop(0), threads
[1][0], threads
[1][1]])
1777 if len(vcpu_id_list
) == 0:
1780 #Get the source pci addresses for the selected numa
1781 used_sriov_ports
= []
1782 for port
in requirements
['numa']['sriov_list']:
1784 result
, content
= db
.get_table(FROM
='resources_port', SELECT
=('id', 'pci', 'mac'),WHERE
={'numa_id':numa_id
,'root_id': port
['port_id'], 'port_id': None, 'Mbps_used': 0} )
1790 if row
['id'] in used_sriov_ports
or row
['id']==port
['port_id']:
1792 port
['pci'] = row
['pci']
1793 if 'mac_address' not in port
:
1794 port
['mac_address'] = row
['mac']
1796 port
['port_id']=row
['id']
1797 port
['Mbps_used'] = port
['bandwidth']
1798 used_sriov_ports
.append(row
['id'])
1801 for port
in requirements
['numa']['port_list']:
1802 port
['Mbps_used'] = None
1803 if port
['dedicated'] != "yes:sriov":
1804 port
['mac_address'] = port
['mac']
1808 result
, content
= db
.get_table(FROM
='resources_port', SELECT
=('id', 'pci', 'mac', 'Mbps'),WHERE
={'numa_id':numa_id
,'root_id': port
['port_id'], 'port_id': None, 'Mbps_used': 0} )
1813 port
['Mbps_used'] = content
[0]['Mbps']
1815 if row
['id'] in used_sriov_ports
or row
['id']==port
['port_id']:
1817 port
['pci'] = row
['pci']
1818 if 'mac_address' not in port
:
1819 port
['mac_address'] = row
['mac'] # mac cannot be set to passthrough ports
1821 port
['port_id']=row
['id']
1822 used_sriov_ports
.append(row
['id'])
1825 # print '2. Physical ports assignation:'+json.dumps(requirements['port_list'], indent=4)
1826 # print '2. SR-IOV assignation:'+json.dumps(requirements['sriov_list'], indent=4)
1828 server
['host_id'] = host_id
1831 #Generate dictionary for saving in db the instance resources
1833 resources
['bridged-ifaces'] = []
1836 numa_dict
['interfaces'] = []
1838 numa_dict
['interfaces'] += requirements
['numa']['port_list']
1839 numa_dict
['interfaces'] += requirements
['numa']['sriov_list']
1841 #Check bridge information
1842 unified_dataplane_iface
=[]
1843 unified_dataplane_iface
+= requirements
['numa']['port_list']
1844 unified_dataplane_iface
+= requirements
['numa']['sriov_list']
1846 for control_iface
in server
.get('networks', []):
1847 control_iface
['net_id']=control_iface
.pop('uuid')
1848 #Get the brifge name
1850 result
, content
= db
.get_table(FROM
='nets', SELECT
=('name','type', 'vlan'),WHERE
={'uuid':control_iface
['net_id']} )
1855 return -1, "Error at field netwoks: Not found any network wit uuid %s" % control_iface
['net_id']
1858 if control_iface
.get("type", 'virtual') == 'virtual':
1859 if network
['type']!='bridge_data' and network
['type']!='bridge_man':
1860 return -1, "Error at field netwoks: network uuid %s for control interface is not of type bridge_man or bridge_data" % control_iface
['net_id']
1861 resources
['bridged-ifaces'].append(control_iface
)
1863 if network
['type']!='data' and network
['type']!='ptp':
1864 return -1, "Error at field netwoks: network uuid %s for dataplane interface is not of type data or ptp" % control_iface
['net_id']
1865 #dataplane interface, look for it in the numa tree and asign this network
1867 for dataplane_iface
in numa_dict
['interfaces']:
1868 if dataplane_iface
['name'] == control_iface
.get("name"):
1869 if (dataplane_iface
['dedicated'] == "yes" and control_iface
["type"] != "PF") or \
1870 (dataplane_iface
['dedicated'] == "no" and control_iface
["type"] != "VF") or \
1871 (dataplane_iface
['dedicated'] == "yes:sriov" and control_iface
["type"] != "VFnotShared") :
1872 return -1, "Error at field netwoks: mismatch at interface '%s' from flavor 'dedicated=%s' and networks 'type=%s'" % \
1873 (control_iface
.get("name"), dataplane_iface
['dedicated'], control_iface
["type"])
1874 dataplane_iface
['uuid'] = control_iface
['net_id']
1875 if dataplane_iface
['dedicated'] == "no":
1876 dataplane_iface
['vlan'] = network
['vlan']
1877 if dataplane_iface
['dedicated'] != "yes" and control_iface
.get("mac_address"):
1878 dataplane_iface
['mac_address'] = control_iface
.get("mac_address")
1879 if control_iface
.get("vpci"):
1880 dataplane_iface
['vpci'] = control_iface
.get("vpci")
1884 return -1, "Error at field netwoks: interface name %s from network not found at flavor" % control_iface
.get("name")
1886 resources
['host_id'] = host_id
1887 resources
['image_id'] = server
['image_id']
1888 resources
['flavor_id'] = server
['flavor_id']
1889 resources
['tenant_id'] = server
['tenant_id']
1890 resources
['ram'] = requirements
['ram']
1891 resources
['vcpus'] = requirements
['vcpus']
1892 resources
['status'] = 'CREATING'
1894 if 'description' in server
: resources
['description'] = server
['description']
1895 if 'name' in server
: resources
['name'] = server
['name']
1897 resources
['extended'] = {} #optional
1898 resources
['extended']['numas'] = []
1899 numa_dict
['numa_id'] = numa_id
1900 numa_dict
['memory'] = requirements
['numa']['memory']
1901 numa_dict
['cores'] = []
1903 for core
in cpu_pinning
:
1904 numa_dict
['cores'].append({'id': core
[2], 'vthread': core
[0], 'paired': paired
})
1905 for core
in reserved_threads
:
1906 numa_dict
['cores'].append({'id': core
})
1907 resources
['extended']['numas'].append(numa_dict
)
1908 if extended
!=None and 'devices' in extended
: #TODO allow extra devices without numa
1909 resources
['extended']['devices'] = extended
['devices']
1912 print '===================================={'
1913 print json
.dumps(resources
, indent
=4)
1914 print '====================================}'