1 # -*- coding: utf-8 -*-
4 # Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
5 # This file is part of openvim
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
12 # http://www.apache.org/licenses/LICENSE-2.0
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact with: nfvlabs@tid.es
25 This is thread that interact with the host and the libvirt to manage VM
26 One thread will be launched per host
28 __author__
="Pablo Montes, Alfonso Tierno"
29 __date__
="$10-jul-2014 12:07:15$"
38 from jsonschema
import validate
as js_v
, exceptions
as js_e
41 from vim_schema
import localinfo_schema
, hostinfo_schema
43 #from logging import Logger
44 #import auxiliary_functions as af
46 #TODO: insert a logging system
49 lvirt_module
=None #libvirt module is charged only if not in test mode
51 class host_thread(threading
.Thread
):
52 def __init__(self
, name
, host
, user
, db
, db_lock
, test
, image_path
, host_id
, version
, develop_mode
, develop_bridge_iface
):
57 'host','user': host ip or name to manage and user
58 'db', 'db_lock': database class and lock to use it in exclusion
60 threading
.Thread
.__init
__(self
)
65 self
.db_lock
= db_lock
69 lvirt_module
= imp
.find_module("libvirt")
70 except (IOError, ImportError) as e
:
71 raise ImportError("Cannot import python-libvirt. Openvim not properly installed" +str(e
))
74 self
.develop_mode
= develop_mode
75 self
.develop_bridge_iface
= develop_bridge_iface
76 self
.image_path
= image_path
77 self
.host_id
= host_id
78 self
.version
= version
83 self
.server_status
= {} #dictionary with pairs server_uuid:server_status
84 self
.pending_terminate_server
=[] #list with pairs (time,server_uuid) time to send a terminate for a server being destroyed
85 self
.next_update_server_status
= 0 #time when must be check servers status
89 self
.queueLock
= threading
.Lock()
90 self
.taskQueue
= Queue
.Queue(2000)
92 def ssh_connect(self
):
95 self
.ssh_conn
= paramiko
.SSHClient()
96 self
.ssh_conn
.set_missing_host_key_policy(paramiko
.AutoAddPolicy())
97 self
.ssh_conn
.load_system_host_keys()
98 self
.ssh_conn
.connect(self
.host
, username
=self
.user
, timeout
=10) #, None)
99 except paramiko
.ssh_exception
.SSHException
as e
:
101 print self
.name
, ": ssh_connect ssh Exception:", text
103 def load_localinfo(self
):
109 command
= 'mkdir -p ' + self
.image_path
110 #print self.name, ': command:', command
111 (_
, stdout
, stderr
) = self
.ssh_conn
.exec_command(command
)
112 content
= stderr
.read()
114 print self
.name
, ': command:', command
, "stderr:", content
116 command
= 'cat ' + self
.image_path
+ '/.openvim.yaml'
117 #print self.name, ': command:', command
118 (_
, stdout
, stderr
) = self
.ssh_conn
.exec_command(command
)
119 content
= stdout
.read()
120 if len(content
) == 0:
121 print self
.name
, ': command:', command
, "stderr:", stderr
.read()
122 raise paramiko
.ssh_exception
.SSHException("Error empty file ")
123 self
.localinfo
= yaml
.load(content
)
124 js_v(self
.localinfo
, localinfo_schema
)
125 self
.localinfo_dirty
=False
126 if 'server_files' not in self
.localinfo
:
127 self
.localinfo
['server_files'] = {}
128 print self
.name
, ': localinfo load from host'
131 except paramiko
.ssh_exception
.SSHException
as e
:
133 print self
.name
, ": load_localinfo ssh Exception:", text
134 except lvirt_module
.libvirtError
as e
:
135 text
= e
.get_error_message()
136 print self
.name
, ": load_localinfo libvirt Exception:", text
137 except yaml
.YAMLError
as exc
:
139 if hasattr(exc
, 'problem_mark'):
140 mark
= exc
.problem_mark
141 text
= " at position: (%s:%s)" % (mark
.line
+1, mark
.column
+1)
142 print self
.name
, ": load_localinfo yaml format Exception", text
143 except js_e
.ValidationError
as e
:
145 if len(e
.path
)>0: text
=" at '" + ":".join(map(str, e
.path
))+"'"
146 print self
.name
, ": load_localinfo format Exception:", text
, e
.message
147 except Exception as e
:
149 print self
.name
, ": load_localinfo Exception:", text
151 #not loaded, insert a default data and force saving by activating dirty flag
152 self
.localinfo
= {'files':{}, 'server_files':{} }
153 #self.localinfo_dirty=True
154 self
.localinfo_dirty
=False
156 def load_hostinfo(self
):
164 command
= 'cat ' + self
.image_path
+ '/hostinfo.yaml'
165 #print self.name, ': command:', command
166 (_
, stdout
, stderr
) = self
.ssh_conn
.exec_command(command
)
167 content
= stdout
.read()
168 if len(content
) == 0:
169 print self
.name
, ': command:', command
, "stderr:", stderr
.read()
170 raise paramiko
.ssh_exception
.SSHException("Error empty file ")
171 self
.hostinfo
= yaml
.load(content
)
172 js_v(self
.hostinfo
, hostinfo_schema
)
173 print self
.name
, ': hostlinfo load from host', self
.hostinfo
176 except paramiko
.ssh_exception
.SSHException
as e
:
178 print self
.name
, ": load_hostinfo ssh Exception:", text
179 except lvirt_module
.libvirtError
as e
:
180 text
= e
.get_error_message()
181 print self
.name
, ": load_hostinfo libvirt Exception:", text
182 except yaml
.YAMLError
as exc
:
184 if hasattr(exc
, 'problem_mark'):
185 mark
= exc
.problem_mark
186 text
= " at position: (%s:%s)" % (mark
.line
+1, mark
.column
+1)
187 print self
.name
, ": load_hostinfo yaml format Exception", text
188 except js_e
.ValidationError
as e
:
190 if len(e
.path
)>0: text
=" at '" + ":".join(map(str, e
.path
))+"'"
191 print self
.name
, ": load_hostinfo format Exception:", text
, e
.message
192 except Exception as e
:
194 print self
.name
, ": load_hostinfo Exception:", text
196 #not loaded, insert a default data
199 def save_localinfo(self
, tries
=3):
201 self
.localinfo_dirty
= False
208 command
= 'cat > ' + self
.image_path
+ '/.openvim.yaml'
209 print self
.name
, ': command:', command
210 (stdin
, _
, _
) = self
.ssh_conn
.exec_command(command
)
211 yaml
.safe_dump(self
.localinfo
, stdin
, explicit_start
=True, indent
=4, default_flow_style
=False, tags
=False, encoding
='utf-8', allow_unicode
=True)
212 self
.localinfo_dirty
= False
215 except paramiko
.ssh_exception
.SSHException
as e
:
217 print self
.name
, ": save_localinfo ssh Exception:", text
218 if "SSH session not active" in text
:
220 except lvirt_module
.libvirtError
as e
:
221 text
= e
.get_error_message()
222 print self
.name
, ": save_localinfo libvirt Exception:", text
223 except yaml
.YAMLError
as exc
:
225 if hasattr(exc
, 'problem_mark'):
226 mark
= exc
.problem_mark
227 text
= " at position: (%s:%s)" % (mark
.line
+1, mark
.column
+1)
228 print self
.name
, ": save_localinfo yaml format Exception", text
229 except Exception as e
:
231 print self
.name
, ": save_localinfo Exception:", text
233 def load_servers_from_db(self
):
234 self
.db_lock
.acquire()
235 r
,c
= self
.db
.get_table(SELECT
=('uuid','status', 'image_id'), FROM
='instances', WHERE
={'host_id': self
.host_id
})
236 self
.db_lock
.release()
238 self
.server_status
= {}
240 print self
.name
, ": Error getting data from database:", c
243 self
.server_status
[ server
['uuid'] ] = server
['status']
245 #convert from old version to new one
246 if 'inc_files' in self
.localinfo
and server
['uuid'] in self
.localinfo
['inc_files']:
247 server_files_dict
= {'source file': self
.localinfo
['inc_files'][ server
['uuid'] ] [0], 'file format':'raw' }
248 if server_files_dict
['source file'][-5:] == 'qcow2':
249 server_files_dict
['file format'] = 'qcow2'
251 self
.localinfo
['server_files'][ server
['uuid'] ] = { server
['image_id'] : server_files_dict
}
252 if 'inc_files' in self
.localinfo
:
253 del self
.localinfo
['inc_files']
254 self
.localinfo_dirty
= True
256 def delete_unused_files(self
):
257 '''Compares self.localinfo['server_files'] content with real servers running self.server_status obtained from database
258 Deletes unused entries at self.loacalinfo and the corresponding local files.
259 The only reason for this mismatch is the manual deletion of instances (VM) at database
263 for uuid
,images
in self
.localinfo
['server_files'].items():
264 if uuid
not in self
.server_status
:
265 for localfile
in images
.values():
267 print self
.name
, ": deleting file '%s' of unused server '%s'" %(localfile
['source file'], uuid
)
268 self
.delete_file(localfile
['source file'])
269 except paramiko
.ssh_exception
.SSHException
as e
:
270 print self
.name
, ": Exception deleting file '%s': %s" %(localfile
['source file'], str(e
))
271 del self
.localinfo
['server_files'][uuid
]
272 self
.localinfo_dirty
= True
274 def insert_task(self
, task
, *aditional
):
276 self
.queueLock
.acquire()
277 task
= self
.taskQueue
.put( (task
,) + aditional
, timeout
=5)
278 self
.queueLock
.release()
281 return -1, "timeout inserting a task over host " + self
.name
285 self
.load_localinfo()
287 self
.load_servers_from_db()
288 self
.delete_unused_files()
290 self
.queueLock
.acquire()
291 if not self
.taskQueue
.empty():
292 task
= self
.taskQueue
.get()
295 self
.queueLock
.release()
299 if self
.localinfo_dirty
:
300 self
.save_localinfo()
301 elif self
.next_update_server_status
< now
:
302 self
.update_servers_status()
303 self
.next_update_server_status
= now
+ 5
304 elif len(self
.pending_terminate_server
)>0 and self
.pending_terminate_server
[0][0]<now
:
305 self
.server_forceoff()
310 if task
[0] == 'instance':
311 print self
.name
, ": processing task instance", task
[1]['action']
315 r
=self
.action_on_server(task
[1], retry
==2)
318 elif task
[0] == 'image':
320 elif task
[0] == 'exit':
321 print self
.name
, ": processing task exit"
324 elif task
[0] == 'reload':
325 print self
.name
, ": processing task reload terminating and relaunching"
328 elif task
[0] == 'edit-iface':
329 print self
.name
, ": processing task edit-iface port=%s, old_net=%s, new_net=%s" % (task
[1], task
[2], task
[3])
330 self
.edit_iface(task
[1], task
[2], task
[3])
331 elif task
[0] == 'restore-iface':
332 print self
.name
, ": processing task restore-iface %s mac=%s" % (task
[1], task
[2])
333 self
.restore_iface(task
[1], task
[2])
335 print self
.name
, ": unknown task", task
337 def server_forceoff(self
, wait_until_finished
=False):
338 while len(self
.pending_terminate_server
)>0:
340 if self
.pending_terminate_server
[0][0]>now
:
341 if wait_until_finished
:
346 req
={'uuid':self
.pending_terminate_server
[0][1],
347 'action':{'terminate':'force'},
350 self
.action_on_server(req
)
351 self
.pending_terminate_server
.pop(0)
355 self
.server_forceoff(True)
356 if self
.localinfo_dirty
:
357 self
.save_localinfo()
359 self
.ssh_conn
.close()
360 except Exception as e
:
362 print self
.name
, ": terminate Exception:", text
363 print self
.name
, ": exit from host_thread"
365 def get_local_iface_name(self
, generic_name
):
366 if self
.hostinfo
!= None and "iface_names" in self
.hostinfo
and generic_name
in self
.hostinfo
["iface_names"]:
367 return self
.hostinfo
["iface_names"][generic_name
]
370 def create_xml_server(self
, server
, dev_list
, server_metadata
={}):
371 """Function that implements the generation of the VM XML definition.
372 Additional devices are in dev_list list
373 The main disk is upon dev_list[0]"""
375 #get if operating system is Windows
377 os_type
= server_metadata
.get('os_type', None)
378 if os_type
== None and 'metadata' in dev_list
[0]:
379 os_type
= dev_list
[0]['metadata'].get('os_type', None)
380 if os_type
!= None and os_type
.lower() == "windows":
382 #get type of hard disk bus
383 bus_ide
= True if windows_os
else False
384 bus
= server_metadata
.get('bus', None)
385 if bus
== None and 'metadata' in dev_list
[0]:
386 bus
= dev_list
[0]['metadata'].get('bus', None)
388 bus_ide
= True if bus
=='ide' else False
392 text
= "<domain type='kvm'>"
394 topo
= server_metadata
.get('topology', None)
395 if topo
== None and 'metadata' in dev_list
[0]:
396 topo
= dev_list
[0]['metadata'].get('topology', None)
398 name
= server
.get('name','') + "_" + server
['uuid']
399 name
= name
[:58] #qemu impose a length limit of 59 chars or not start. Using 58
400 text
+= self
.inc_tab() + "<name>" + name
+ "</name>"
402 text
+= self
.tab() + "<uuid>" + server
['uuid'] + "</uuid>"
405 if 'extended' in server
and server
['extended']!=None and 'numas' in server
['extended']:
406 numa
= server
['extended']['numas'][0]
409 memory
= int(numa
.get('memory',0))*1024*1024 #in KiB
411 memory
= int(server
['ram'])*1024;
413 if not self
.develop_mode
:
416 return -1, 'No memory assigned to instance'
418 text
+= self
.tab() + "<memory unit='KiB'>" +memory
+"</memory>"
419 text
+= self
.tab() + "<currentMemory unit='KiB'>" +memory
+ "</currentMemory>"
421 text
+= self
.tab()+'<memoryBacking>'+ \
422 self
.inc_tab() + '<hugepages/>'+ \
423 self
.dec_tab()+ '</memoryBacking>'
426 use_cpu_pinning
=False
427 vcpus
= int(server
.get("vcpus",0))
429 if 'cores-source' in numa
:
431 for index
in range(0, len(numa
['cores-source'])):
432 cpu_pinning
.append( [ numa
['cores-id'][index
], numa
['cores-source'][index
] ] )
434 if 'threads-source' in numa
:
436 for index
in range(0, len(numa
['threads-source'])):
437 cpu_pinning
.append( [ numa
['threads-id'][index
], numa
['threads-source'][index
] ] )
439 if 'paired-threads-source' in numa
:
441 for index
in range(0, len(numa
['paired-threads-source'])):
442 cpu_pinning
.append( [numa
['paired-threads-id'][index
][0], numa
['paired-threads-source'][index
][0] ] )
443 cpu_pinning
.append( [numa
['paired-threads-id'][index
][1], numa
['paired-threads-source'][index
][1] ] )
446 if use_cpu_pinning
and not self
.develop_mode
:
447 text
+= self
.tab()+"<vcpu placement='static'>" +str(len(cpu_pinning
)) +"</vcpu>" + \
448 self
.tab()+'<cputune>'
450 for i
in range(0, len(cpu_pinning
)):
451 text
+= self
.tab() + "<vcpupin vcpu='" +str(cpu_pinning
[i
][0])+ "' cpuset='" +str(cpu_pinning
[i
][1]) +"'/>"
452 text
+= self
.dec_tab()+'</cputune>'+ \
453 self
.tab() + '<numatune>' +\
454 self
.inc_tab() + "<memory mode='strict' nodeset='" +str(numa
['source'])+ "'/>" +\
455 self
.dec_tab() + '</numatune>'
458 return -1, "Instance without number of cpus"
459 text
+= self
.tab()+"<vcpu>" + str(vcpus
) + "</vcpu>"
464 if dev
['type']=='cdrom' :
467 text
+= self
.tab()+ '<os>' + \
468 self
.inc_tab() + "<type arch='x86_64' machine='pc'>hvm</type>"
470 text
+= self
.tab() + "<boot dev='cdrom'/>"
471 text
+= self
.tab() + "<boot dev='hd'/>" + \
472 self
.dec_tab()+'</os>'
474 text
+= self
.tab()+'<features>'+\
475 self
.inc_tab()+'<acpi/>' +\
476 self
.tab()+'<apic/>' +\
477 self
.tab()+'<pae/>'+ \
478 self
.dec_tab() +'</features>'
479 if windows_os
or topo
=="oneSocket":
480 text
+= self
.tab() + "<cpu mode='host-model'> <topology sockets='1' cores='%d' threads='1' /> </cpu>"% vcpus
482 text
+= self
.tab() + "<cpu mode='host-model'></cpu>"
483 text
+= self
.tab() + "<clock offset='utc'/>" +\
484 self
.tab() + "<on_poweroff>preserve</on_poweroff>" + \
485 self
.tab() + "<on_reboot>restart</on_reboot>" + \
486 self
.tab() + "<on_crash>restart</on_crash>"
487 text
+= self
.tab() + "<devices>" + \
488 self
.inc_tab() + "<emulator>/usr/libexec/qemu-kvm</emulator>" + \
489 self
.tab() + "<serial type='pty'>" +\
490 self
.inc_tab() + "<target port='0'/>" + \
491 self
.dec_tab() + "</serial>" +\
492 self
.tab() + "<console type='pty'>" + \
493 self
.inc_tab()+ "<target type='serial' port='0'/>" + \
494 self
.dec_tab()+'</console>'
496 text
+= self
.tab() + "<controller type='usb' index='0'/>" + \
497 self
.tab() + "<controller type='ide' index='0'/>" + \
498 self
.tab() + "<input type='mouse' bus='ps2'/>" + \
499 self
.tab() + "<sound model='ich6'/>" + \
500 self
.tab() + "<video>" + \
501 self
.inc_tab() + "<model type='cirrus' vram='9216' heads='1'/>" + \
502 self
.dec_tab() + "</video>" + \
503 self
.tab() + "<memballoon model='virtio'/>" + \
504 self
.tab() + "<input type='tablet' bus='usb'/>" #TODO revisar
506 #> self.tab()+'<alias name=\'hostdev0\'/>\n' +\
507 #> self.dec_tab()+'</hostdev>\n' +\
508 #> self.tab()+'<input type=\'tablet\' bus=\'usb\'/>\n'
510 text
+= self
.tab() + "<graphics type='vnc' port='-1' autoport='yes'/>"
512 #If image contains 'GRAPH' include graphics
513 #if 'GRAPH' in image:
514 text
+= self
.tab() + "<graphics type='vnc' port='-1' autoport='yes' listen='0.0.0.0'>" +\
515 self
.inc_tab() + "<listen type='address' address='0.0.0.0'/>" +\
516 self
.dec_tab() + "</graphics>"
520 bus_ide_dev
= bus_ide
521 if dev
['type']=='cdrom' or dev
['type']=='disk':
522 if dev
['type']=='cdrom':
524 text
+= self
.tab() + "<disk type='file' device='"+dev
['type']+"'>"
525 if 'file format' in dev
:
526 text
+= self
.inc_tab() + "<driver name='qemu' type='" +dev
['file format']+ "' cache='writethrough'/>"
527 if 'source file' in dev
:
528 text
+= self
.tab() + "<source file='" +dev
['source file']+ "'/>"
529 #elif v['type'] == 'block':
530 # text += self.tab() + "<source dev='" + v['source'] + "'/>"
532 # return -1, 'Unknown disk type ' + v['type']
533 vpci
= dev
.get('vpci',None)
535 vpci
= dev
['metadata'].get('vpci',None)
536 text
+= self
.pci2xml(vpci
)
539 text
+= self
.tab() + "<target dev='hd" +vd_index
+ "' bus='ide'/>" #TODO allows several type of disks
541 text
+= self
.tab() + "<target dev='vd" +vd_index
+ "' bus='virtio'/>"
542 text
+= self
.dec_tab() + '</disk>'
543 vd_index
= chr(ord(vd_index
)+1)
544 elif dev
['type']=='xml':
545 dev_text
= dev
['xml']
547 dev_text
= dev_text
.replace('__vpci__', dev
['vpci'])
548 if 'source file' in dev
:
549 dev_text
= dev_text
.replace('__file__', dev
['source file'])
550 if 'file format' in dev
:
551 dev_text
= dev_text
.replace('__format__', dev
['source file'])
552 if '__dev__' in dev_text
:
553 dev_text
= dev_text
.replace('__dev__', vd_index
)
554 vd_index
= chr(ord(vd_index
)+1)
557 return -1, 'Unknown device type ' + dev
['type']
560 bridge_interfaces
= server
.get('networks', [])
561 for v
in bridge_interfaces
:
563 self
.db_lock
.acquire()
564 result
, content
= self
.db
.get_table(FROM
='nets', SELECT
=('provider',),WHERE
={'uuid':v
['net_id']} )
565 self
.db_lock
.release()
567 print "create_xml_server ERROR getting nets",result
, content
569 #ALF: Allow by the moment the 'default' bridge net because is confortable for provide internet to VM
570 #I know it is not secure
571 #for v in sorted(desc['network interfaces'].itervalues()):
572 model
= v
.get("model", None)
573 if content
[0]['provider']=='default':
574 text
+= self
.tab() + "<interface type='network'>" + \
575 self
.inc_tab() + "<source network='" +content
[0]['provider']+ "'/>"
576 elif content
[0]['provider'][0:7]=='macvtap':
577 text
+= self
.tab()+"<interface type='direct'>" + \
578 self
.inc_tab() + "<source dev='" + self
.get_local_iface_name(content
[0]['provider'][8:]) + "' mode='bridge'/>" + \
579 self
.tab() + "<target dev='macvtap0'/>"
581 text
+= self
.tab() + "<alias name='net" + str(net_nb
) + "'/>"
584 elif content
[0]['provider'][0:6]=='bridge':
585 text
+= self
.tab() + "<interface type='bridge'>" + \
586 self
.inc_tab()+"<source bridge='" +self
.get_local_iface_name(content
[0]['provider'][7:])+ "'/>"
588 text
+= self
.tab() + "<target dev='vnet" + str(net_nb
)+ "'/>" +\
589 self
.tab() + "<alias name='net" + str(net_nb
)+ "'/>"
593 return -1, 'Unknown Bridge net provider ' + content
[0]['provider']
595 text
+= self
.tab() + "<model type='" +model
+ "'/>"
596 if v
.get('mac_address', None) != None:
597 text
+= self
.tab() +"<mac address='" +v
['mac_address']+ "'/>"
598 text
+= self
.pci2xml(v
.get('vpci',None))
599 text
+= self
.dec_tab()+'</interface>'
603 interfaces
= numa
.get('interfaces', [])
607 if self
.develop_mode
: #map these interfaces to bridges
608 text
+= self
.tab() + "<interface type='bridge'>" + \
609 self
.inc_tab()+"<source bridge='" +self
.develop_bridge_iface
+ "'/>"
611 text
+= self
.tab() + "<target dev='vnet" + str(net_nb
)+ "'/>" +\
612 self
.tab() + "<alias name='net" + str(net_nb
)+ "'/>"
614 text
+= self
.tab() + "<model type='e1000'/>" #e1000 is more probable to be supported than 'virtio'
615 if v
.get('mac_address', None) != None:
616 text
+= self
.tab() +"<mac address='" +v
['mac_address']+ "'/>"
617 text
+= self
.pci2xml(v
.get('vpci',None))
618 text
+= self
.dec_tab()+'</interface>'
621 if v
['dedicated'] == 'yes': #passthrought
622 text
+= self
.tab() + "<hostdev mode='subsystem' type='pci' managed='yes'>" + \
623 self
.inc_tab() + "<source>"
625 text
+= self
.pci2xml(v
['source'])
626 text
+= self
.dec_tab()+'</source>'
627 text
+= self
.pci2xml(v
.get('vpci',None))
629 text
+= self
.tab() + "<alias name='hostdev" + str(net_nb
) + "'/>"
630 text
+= self
.dec_tab()+'</hostdev>'
632 else: #sriov_interfaces
633 #skip not connected interfaces
634 if v
.get("net_id") == None:
636 text
+= self
.tab() + "<interface type='hostdev' managed='yes'>"
638 if v
.get('mac_address', None) != None:
639 text
+= self
.tab() + "<mac address='" +v
['mac_address']+ "'/>"
640 text
+= self
.tab()+'<source>'
642 text
+= self
.pci2xml(v
['source'])
643 text
+= self
.dec_tab()+'</source>'
644 if v
.get('vlan',None) != None:
645 text
+= self
.tab() + "<vlan> <tag id='" + str(v
['vlan']) + "'/> </vlan>"
646 text
+= self
.pci2xml(v
.get('vpci',None))
648 text
+= self
.tab() + "<alias name='hostdev" + str(net_nb
) + "'/>"
649 text
+= self
.dec_tab()+'</interface>'
652 text
+= self
.dec_tab()+'</devices>'+\
653 self
.dec_tab()+'</domain>'
656 def pci2xml(self
, pci
):
657 '''from a pci format text XXXX:XX:XX.X generates the xml content of <address>
658 alows an empty pci text'''
661 first_part
= pci
.split(':')
662 second_part
= first_part
[2].split('.')
663 return self
.tab() + "<address type='pci' domain='0x" + first_part
[0] + \
664 "' bus='0x" + first_part
[1] + "' slot='0x" + second_part
[0] + \
665 "' function='0x" + second_part
[1] + "'/>"
668 """Return indentation according to xml_level"""
669 return "\n" + (' '*self
.xml_level
)
672 """Increment and return indentation according to xml_level"""
677 """Decrement and return indentation according to xml_level"""
681 def get_file_info(self
, path
):
682 command
= 'ls -lL --time-style=+%Y-%m-%dT%H:%M:%S ' + path
683 print self
.name
, ': command:', command
684 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
685 content
= stdout
.read()
686 if len(content
) == 0:
687 return None # file does not exist
689 return content
.split(" ") #(permission, 1, owner, group, size, date, file)
691 def qemu_get_info(self
, path
):
692 command
= 'qemu-img info ' + path
693 print self
.name
, ': command:', command
694 (_
, stdout
, stderr
) = self
.ssh_conn
.exec_command(command
)
695 content
= stdout
.read()
696 if len(content
) == 0:
697 error
= stderr
.read()
698 print self
.name
, ": get_qemu_info error ", error
699 raise paramiko
.ssh_exception
.SSHException("Error getting qemu_info: " + error
)
702 return yaml
.load(content
)
703 except yaml
.YAMLError
as exc
:
705 if hasattr(exc
, 'problem_mark'):
706 mark
= exc
.problem_mark
707 text
= " at position: (%s:%s)" % (mark
.line
+1, mark
.column
+1)
708 print self
.name
, ": get_qemu_info yaml format Exception", text
709 raise paramiko
.ssh_exception
.SSHException("Error getting qemu_info yaml format" + text
)
711 def qemu_change_backing(self
, inc_file
, new_backing_file
):
712 command
= 'qemu-img rebase -u -b ' + new_backing_file
+ ' ' + inc_file
713 print self
.name
, ': command:', command
714 (_
, _
, stderr
) = self
.ssh_conn
.exec_command(command
)
715 content
= stderr
.read()
716 if len(content
) == 0:
719 print self
.name
, ": qemu_change_backing error: ", content
722 def get_notused_filename(self
, proposed_name
, suffix
=''):
723 '''Look for a non existing file_name in the host
724 proposed_name: proposed file name, includes path
725 suffix: suffix to be added to the name, before the extention
727 extension
= proposed_name
.rfind(".")
728 slash
= proposed_name
.rfind("/")
729 if extension
< 0 or extension
< slash
: # no extension
730 extension
= len(proposed_name
)
731 target_name
= proposed_name
[:extension
] + suffix
+ proposed_name
[extension
:]
732 info
= self
.get_file_info(target_name
)
737 while info
is not None:
738 target_name
= proposed_name
[:extension
] + suffix
+ "-" + str(index
) + proposed_name
[extension
:]
740 info
= self
.get_file_info(target_name
)
743 def get_notused_path(self
, proposed_path
, suffix
=''):
744 '''Look for a non existing path at database for images
745 proposed_path: proposed file name, includes path
746 suffix: suffix to be added to the name, before the extention
748 extension
= proposed_path
.rfind(".")
750 extension
= len(proposed_path
)
752 target_path
= proposed_path
[:extension
] + suffix
+ proposed_path
[extension
:]
755 r
,_
=self
.db
.get_table(FROM
="images",WHERE
={"path":target_path
})
758 target_path
= proposed_path
[:extension
] + suffix
+ "-" + str(index
) + proposed_path
[extension
:]
762 def delete_file(self
, file_name
):
763 command
= 'rm -f '+file_name
764 print self
.name
, ': command:', command
765 (_
, _
, stderr
) = self
.ssh_conn
.exec_command(command
)
766 error_msg
= stderr
.read()
767 if len(error_msg
) > 0:
768 raise paramiko
.ssh_exception
.SSHException("Error deleting file: " + error_msg
)
770 def copy_file(self
, source
, destination
, perserve_time
=True):
771 if source
[0:4]=="http":
772 command
= "wget --no-verbose -O '{dst}' '{src}' 2>'{dst_result}' || cat '{dst_result}' >&2 && rm '{dst_result}'".format(
773 dst
=destination
, src
=source
, dst_result
=destination
+ ".result" )
775 command
= 'cp --no-preserve=mode'
777 command
+= ' --preserve=timestamps'
778 command
+= " '{}' '{}'".format(source
, destination
)
779 print self
.name
, ': command:', command
780 (_
, _
, stderr
) = self
.ssh_conn
.exec_command(command
)
781 error_msg
= stderr
.read()
782 if len(error_msg
) > 0:
783 raise paramiko
.ssh_exception
.SSHException("Error copying image to local host: " + error_msg
)
785 def copy_remote_file(self
, remote_file
, use_incremental
):
786 ''' Copy a file from the repository to local folder and recursively
787 copy the backing files in case the remote file is incremental
788 Read and/or modified self.localinfo['files'] that contain the
789 unmodified copies of images in the local path
791 remote_file: path of remote file
792 use_incremental: None (leave the decision to this function), True, False
794 local_file: name of local file
795 qemu_info: dict with quemu information of local file
796 use_incremental_out: True, False; same as use_incremental, but if None a decision is taken
799 use_incremental_out
= use_incremental
800 new_backing_file
= None
802 file_from_local
= True
804 #in case incremental use is not decided, take the decision depending on the image
805 #avoid the use of incremental if this image is already incremental
806 if remote_file
[0:4] == "http":
807 file_from_local
= False
809 qemu_remote_info
= self
.qemu_get_info(remote_file
)
810 if use_incremental_out
==None:
811 use_incremental_out
= not ( file_from_local
and 'backing file' in qemu_remote_info
)
812 #copy recursivelly the backing files
813 if file_from_local
and 'backing file' in qemu_remote_info
:
814 new_backing_file
, _
, _
= self
.copy_remote_file(qemu_remote_info
['backing file'], True)
816 #check if remote file is present locally
817 if use_incremental_out
and remote_file
in self
.localinfo
['files']:
818 local_file
= self
.localinfo
['files'][remote_file
]
819 local_file_info
= self
.get_file_info(local_file
)
821 remote_file_info
= self
.get_file_info(remote_file
)
822 if local_file_info
== None:
824 elif file_from_local
and (local_file_info
[4]!=remote_file_info
[4] or local_file_info
[5]!=remote_file_info
[5]):
825 #local copy of file not valid because date or size are different.
826 #TODO DELETE local file if this file is not used by any active virtual machine
828 self
.delete_file(local_file
)
829 del self
.localinfo
['files'][remote_file
]
833 else: #check that the local file has the same backing file, or there are not backing at all
834 qemu_info
= self
.qemu_get_info(local_file
)
835 if new_backing_file
!= qemu_info
.get('backing file'):
839 if local_file
== None: #copy the file
840 img_name
= remote_file
.split('/') [-1]
841 img_local
= self
.image_path
+ '/' + img_name
842 local_file
= self
.get_notused_filename(img_local
)
843 self
.copy_file(remote_file
, local_file
, use_incremental_out
)
845 if use_incremental_out
:
846 self
.localinfo
['files'][remote_file
] = local_file
848 self
.qemu_change_backing(local_file
, new_backing_file
)
849 qemu_info
= self
.qemu_get_info(local_file
)
851 return local_file
, qemu_info
, use_incremental_out
853 def launch_server(self
, conn
, server
, rebuild
=False, domain
=None):
855 time
.sleep(random
.randint(20,150)) #sleep random timeto be make it a bit more real
858 server_id
= server
['uuid']
859 paused
= server
.get('paused','no')
861 if domain
!=None and rebuild
==False:
863 #self.server_status[server_id] = 'ACTIVE'
866 self
.db_lock
.acquire()
867 result
, server_data
= self
.db
.get_instance(server_id
)
868 self
.db_lock
.release()
870 print self
.name
, ": launch_server ERROR getting server from DB",result
, server_data
871 return result
, server_data
873 #0: get image metadata
874 server_metadata
= server
.get('metadata', {})
875 use_incremental
= None
877 if "use_incremental" in server_metadata
:
878 use_incremental
= False if server_metadata
["use_incremental"]=="no" else True
880 server_host_files
= self
.localinfo
['server_files'].get( server
['uuid'], {})
882 #delete previous incremental files
883 for file_
in server_host_files
.values():
884 self
.delete_file(file_
['source file'] )
887 #1: obtain aditional devices (disks)
888 #Put as first device the main disk
889 devices
= [ {"type":"disk", "image_id":server
['image_id'], "vpci":server_metadata
.get('vpci', None) } ]
890 if 'extended' in server_data
and server_data
['extended']!=None and "devices" in server_data
['extended']:
891 devices
+= server_data
['extended']['devices']
894 if dev
['image_id'] == None:
897 self
.db_lock
.acquire()
898 result
, content
= self
.db
.get_table(FROM
='images', SELECT
=('path','metadata'),WHERE
={'uuid':dev
['image_id']} )
899 self
.db_lock
.release()
901 error_text
= "ERROR", result
, content
, "when getting image", dev
['image_id']
902 print self
.name
, ": launch_server", error_text
903 return -1, error_text
904 if content
[0]['metadata'] is not None:
905 dev
['metadata'] = json
.loads(content
[0]['metadata'])
909 if dev
['image_id'] in server_host_files
:
910 dev
['source file'] = server_host_files
[ dev
['image_id'] ] ['source file'] #local path
911 dev
['file format'] = server_host_files
[ dev
['image_id'] ] ['file format'] # raw or qcow2
914 #2: copy image to host
915 remote_file
= content
[0]['path']
916 use_incremental_image
= use_incremental
917 if dev
['metadata'].get("use_incremental") == "no":
918 use_incremental_image
= False
919 local_file
, qemu_info
, use_incremental_image
= self
.copy_remote_file(remote_file
, use_incremental_image
)
921 #create incremental image
922 if use_incremental_image
:
923 local_file_inc
= self
.get_notused_filename(local_file
, '.inc')
924 command
= 'qemu-img create -f qcow2 '+local_file_inc
+ ' -o backing_file='+ local_file
925 print 'command:', command
926 (_
, _
, stderr
) = self
.ssh_conn
.exec_command(command
)
927 error_msg
= stderr
.read()
928 if len(error_msg
) > 0:
929 raise paramiko
.ssh_exception
.SSHException("Error creating incremental file: " + error_msg
)
930 local_file
= local_file_inc
931 qemu_info
= {'file format':'qcow2'}
933 server_host_files
[ dev
['image_id'] ] = {'source file': local_file
, 'file format': qemu_info
['file format']}
935 dev
['source file'] = local_file
936 dev
['file format'] = qemu_info
['file format']
938 self
.localinfo
['server_files'][ server
['uuid'] ] = server_host_files
939 self
.localinfo_dirty
= True
942 result
, xml
= self
.create_xml_server(server_data
, devices
, server_metadata
) #local_file
944 print self
.name
, ": create xml server error:", xml
946 print self
.name
, ": create xml:", xml
947 atribute
= lvirt_module
.VIR_DOMAIN_START_PAUSED
if paused
== "yes" else 0
949 if not rebuild
: #ensures that any pending destroying server is done
950 self
.server_forceoff(True)
951 #print self.name, ": launching instance" #, xml
952 conn
.createXML(xml
, atribute
)
953 #self.server_status[server_id] = 'PAUSED' if paused == "yes" else 'ACTIVE'
957 except paramiko
.ssh_exception
.SSHException
as e
:
959 print self
.name
, ": launch_server(%s) ssh Exception: %s" %(server_id
, text
)
960 if "SSH session not active" in text
:
962 except lvirt_module
.libvirtError
as e
:
963 text
= e
.get_error_message()
964 print self
.name
, ": launch_server(%s) libvirt Exception: %s" %(server_id
, text
)
965 except Exception as e
:
967 print self
.name
, ": launch_server(%s) Exception: %s" %(server_id
, text
)
970 def update_servers_status(self
):
972 # VIR_DOMAIN_NOSTATE = 0
973 # VIR_DOMAIN_RUNNING = 1
974 # VIR_DOMAIN_BLOCKED = 2
975 # VIR_DOMAIN_PAUSED = 3
976 # VIR_DOMAIN_SHUTDOWN = 4
977 # VIR_DOMAIN_SHUTOFF = 5
978 # VIR_DOMAIN_CRASHED = 6
979 # VIR_DOMAIN_PMSUSPENDED = 7 #TODO suspended
981 if self
.test
or len(self
.server_status
)==0:
985 conn
= lvirt_module
.open("qemu+ssh://"+self
.user
+"@"+self
.host
+"/system")
986 domains
= conn
.listAllDomains()
988 for domain
in domains
:
989 uuid
= domain
.UUIDString() ;
990 libvirt_status
= domain
.state()
991 #print libvirt_status
992 if libvirt_status
[0] == lvirt_module
.VIR_DOMAIN_RUNNING
or libvirt_status
[0] == lvirt_module
.VIR_DOMAIN_SHUTDOWN
:
993 new_status
= "ACTIVE"
994 elif libvirt_status
[0] == lvirt_module
.VIR_DOMAIN_PAUSED
:
995 new_status
= "PAUSED"
996 elif libvirt_status
[0] == lvirt_module
.VIR_DOMAIN_SHUTOFF
:
997 new_status
= "INACTIVE"
998 elif libvirt_status
[0] == lvirt_module
.VIR_DOMAIN_CRASHED
:
1002 domain_dict
[uuid
] = new_status
1004 except lvirt_module
.libvirtError
as e
:
1005 print self
.name
, ": get_state() Exception '", e
.get_error_message()
1008 for server_id
, current_status
in self
.server_status
.iteritems():
1010 if server_id
in domain_dict
:
1011 new_status
= domain_dict
[server_id
]
1013 new_status
= "INACTIVE"
1015 if new_status
== None or new_status
== current_status
:
1017 if new_status
== 'INACTIVE' and current_status
== 'ERROR':
1018 continue #keep ERROR status, because obviously this machine is not running
1020 print self
.name
, ": server ", server_id
, "status change from ", current_status
, "to", new_status
1021 STATUS
={'progress':100, 'status':new_status
}
1022 if new_status
== 'ERROR':
1023 STATUS
['last_error'] = 'machine has crashed'
1024 self
.db_lock
.acquire()
1025 r
,_
= self
.db
.update_rows('instances', STATUS
, {'uuid':server_id
}, log
=False)
1026 self
.db_lock
.release()
1028 self
.server_status
[server_id
] = new_status
1030 def action_on_server(self
, req
, last_retry
=True):
1031 '''Perform an action on a req
1033 req: dictionary that contain:
1034 server properties: 'uuid','name','tenant_id','status'
1036 host properties: 'user', 'ip_name'
1037 return (error, text)
1038 0: No error. VM is updated to new state,
1039 -1: Invalid action, as trying to pause a PAUSED VM
1040 -2: Error accessing host
1042 -4: Error at DB access
1043 -5: Error while trying to perform action. VM is updated to ERROR
1045 server_id
= req
['uuid']
1048 old_status
= req
['status']
1052 if 'terminate' in req
['action']:
1053 new_status
= 'deleted'
1054 elif 'shutoff' in req
['action'] or 'shutdown' in req
['action'] or 'forceOff' in req
['action']:
1055 if req
['status']!='ERROR':
1057 new_status
= 'INACTIVE'
1058 elif 'start' in req
['action'] and req
['status']!='ERROR': new_status
= 'ACTIVE'
1059 elif 'resume' in req
['action'] and req
['status']!='ERROR' and req
['status']!='INACTIVE' : new_status
= 'ACTIVE'
1060 elif 'pause' in req
['action'] and req
['status']!='ERROR': new_status
= 'PAUSED'
1061 elif 'reboot' in req
['action'] and req
['status']!='ERROR': new_status
= 'ACTIVE'
1062 elif 'rebuild' in req
['action']:
1063 time
.sleep(random
.randint(20,150))
1064 new_status
= 'ACTIVE'
1065 elif 'createImage' in req
['action']:
1067 self
.create_image(None, req
)
1070 conn
= lvirt_module
.open("qemu+ssh://"+self
.user
+"@"+self
.host
+"/system")
1072 dom
= conn
.lookupByUUIDString(server_id
)
1073 except lvirt_module
.libvirtError
as e
:
1074 text
= e
.get_error_message()
1075 if 'LookupByUUIDString' in text
or 'Domain not found' in text
or 'No existe un dominio coincidente' in text
:
1078 print self
.name
, ": action_on_server(",server_id
,") libvirt exception:", text
1081 if 'forceOff' in req
['action']:
1083 print self
.name
, ": action_on_server(",server_id
,") domain not running"
1086 print self
.name
, ": sending DESTROY to server", server_id
1088 except Exception as e
:
1089 if "domain is not running" not in e
.get_error_message():
1090 print self
.name
, ": action_on_server(",server_id
,") Exception while sending force off:", e
.get_error_message()
1091 last_error
= 'action_on_server Exception while destroy: ' + e
.get_error_message()
1092 new_status
= 'ERROR'
1094 elif 'terminate' in req
['action']:
1096 print self
.name
, ": action_on_server(",server_id
,") domain not running"
1097 new_status
= 'deleted'
1100 if req
['action']['terminate'] == 'force':
1101 print self
.name
, ": sending DESTROY to server", server_id
1103 new_status
= 'deleted'
1105 print self
.name
, ": sending SHUTDOWN to server", server_id
1107 self
.pending_terminate_server
.append( (time
.time()+10,server_id
) )
1108 except Exception as e
:
1109 print self
.name
, ": action_on_server(",server_id
,") Exception while destroy:", e
.get_error_message()
1110 last_error
= 'action_on_server Exception while destroy: ' + e
.get_error_message()
1111 new_status
= 'ERROR'
1112 if "domain is not running" in e
.get_error_message():
1115 new_status
= 'deleted'
1117 print self
.name
, ": action_on_server(",server_id
,") Exception while undefine:", e
.get_error_message()
1118 last_error
= 'action_on_server Exception2 while undefine:', e
.get_error_message()
1119 #Exception: 'virDomainDetachDevice() failed'
1120 if new_status
=='deleted':
1121 if server_id
in self
.server_status
:
1122 del self
.server_status
[server_id
]
1123 if req
['uuid'] in self
.localinfo
['server_files']:
1124 for file_
in self
.localinfo
['server_files'][ req
['uuid'] ].values():
1126 self
.delete_file(file_
['source file'])
1129 del self
.localinfo
['server_files'][ req
['uuid'] ]
1130 self
.localinfo_dirty
= True
1132 elif 'shutoff' in req
['action'] or 'shutdown' in req
['action']:
1135 print self
.name
, ": action_on_server(",server_id
,") domain not running"
1138 # new_status = 'INACTIVE'
1139 #TODO: check status for changing at database
1140 except Exception as e
:
1141 new_status
= 'ERROR'
1142 print self
.name
, ": action_on_server(",server_id
,") Exception while shutdown:", e
.get_error_message()
1143 last_error
= 'action_on_server Exception while shutdown: ' + e
.get_error_message()
1145 elif 'rebuild' in req
['action']:
1148 r
= self
.launch_server(conn
, req
, True, None)
1150 new_status
= 'ERROR'
1153 new_status
= 'ACTIVE'
1154 elif 'start' in req
['action']:
1155 #La instancia está sólo en la base de datos pero no en la libvirt. es necesario crearla
1156 rebuild
= True if req
['action']['start']=='rebuild' else False
1157 r
= self
.launch_server(conn
, req
, rebuild
, dom
)
1159 new_status
= 'ERROR'
1162 new_status
= 'ACTIVE'
1164 elif 'resume' in req
['action']:
1170 # new_status = 'ACTIVE'
1171 except Exception as e
:
1172 print self
.name
, ": action_on_server(",server_id
,") Exception while resume:", e
.get_error_message()
1174 elif 'pause' in req
['action']:
1180 # new_status = 'PAUSED'
1181 except Exception as e
:
1182 print self
.name
, ": action_on_server(",server_id
,") Exception while pause:", e
.get_error_message()
1184 elif 'reboot' in req
['action']:
1190 print self
.name
, ": action_on_server(",server_id
,") reboot:"
1191 #new_status = 'ACTIVE'
1192 except Exception as e
:
1193 print self
.name
, ": action_on_server(",server_id
,") Exception while reboot:", e
.get_error_message()
1194 elif 'createImage' in req
['action']:
1195 self
.create_image(dom
, req
)
1199 except lvirt_module
.libvirtError
as e
:
1200 if conn
is not None: conn
.close
1201 text
= e
.get_error_message()
1202 new_status
= "ERROR"
1204 print self
.name
, ": action_on_server(",server_id
,") Exception '", text
1205 if 'LookupByUUIDString' in text
or 'Domain not found' in text
or 'No existe un dominio coincidente' in text
:
1206 print self
.name
, ": action_on_server(",server_id
,") Exception removed from host"
1207 #end of if self.test
1208 if new_status
== None:
1211 print self
.name
, ": action_on_server(",server_id
,") new status", new_status
, last_error
1212 UPDATE
= {'progress':100, 'status':new_status
}
1214 if new_status
=='ERROR':
1215 if not last_retry
: #if there will be another retry do not update database
1217 elif 'terminate' in req
['action']:
1218 #PUT a log in the database
1219 print self
.name
, ": PANIC deleting server", server_id
, last_error
1220 self
.db_lock
.acquire()
1221 self
.db
.new_row('logs',
1222 {'uuid':server_id
, 'tenant_id':req
['tenant_id'], 'related':'instances','level':'panic',
1223 'description':'PANIC deleting server from host '+self
.name
+': '+last_error
}
1225 self
.db_lock
.release()
1226 if server_id
in self
.server_status
:
1227 del self
.server_status
[server_id
]
1230 UPDATE
['last_error'] = last_error
1231 if new_status
!= 'deleted' and (new_status
!= old_status
or new_status
== 'ERROR') :
1232 self
.db_lock
.acquire()
1233 self
.db
.update_rows('instances', UPDATE
, {'uuid':server_id
}, log
=True)
1234 self
.server_status
[server_id
] = new_status
1235 self
.db_lock
.release()
1236 if new_status
== 'ERROR':
1241 def restore_iface(self
, name
, mac
, lib_conn
=None):
1242 ''' make an ifdown, ifup to restore default parameter of na interface
1244 mac: mac address of the interface
1245 lib_conn: connection to the libvirt, if None a new connection is created
1246 Return 0,None if ok, -1,text if fails
1252 print self
.name
, ": restore_iface '%s' %s" % (name
, mac
)
1256 conn
= lvirt_module
.open("qemu+ssh://"+self
.user
+"@"+self
.host
+"/system")
1260 #wait to the pending VM deletion
1261 #TODO.Revise self.server_forceoff(True)
1263 iface
= conn
.interfaceLookupByMACString(mac
)
1266 print self
.name
, ": restore_iface '%s' %s" % (name
, mac
)
1267 except lvirt_module
.libvirtError
as e
:
1268 error_text
= e
.get_error_message()
1269 print self
.name
, ": restore_iface '%s' '%s' libvirt exception: %s" %(name
, mac
, error_text
)
1272 if lib_conn
is None and conn
is not None:
1274 return ret
, error_text
1277 def create_image(self
,dom
, req
):
1279 if 'path' in req
['action']['createImage']:
1280 file_dst
= req
['action']['createImage']['path']
1282 createImage
=req
['action']['createImage']
1283 img_name
= createImage
['source']['path']
1284 index
=img_name
.rfind('/')
1285 file_dst
= self
.get_notused_path(img_name
[:index
+1] + createImage
['name'] + '.qcow2')
1286 image_status
='ACTIVE'
1290 server_id
= req
['uuid']
1291 createImage
=req
['action']['createImage']
1292 file_orig
= self
.localinfo
['server_files'][server_id
] [ createImage
['source']['image_id'] ] ['source file']
1293 if 'path' in req
['action']['createImage']:
1294 file_dst
= req
['action']['createImage']['path']
1296 img_name
= createImage
['source']['path']
1297 index
=img_name
.rfind('/')
1298 file_dst
= self
.get_notused_filename(img_name
[:index
+1] + createImage
['name'] + '.qcow2')
1300 self
.copy_file(file_orig
, file_dst
)
1301 qemu_info
= self
.qemu_get_info(file_orig
)
1302 if 'backing file' in qemu_info
:
1303 for k
,v
in self
.localinfo
['files'].items():
1304 if v
==qemu_info
['backing file']:
1305 self
.qemu_change_backing(file_dst
, k
)
1307 image_status
='ACTIVE'
1309 except paramiko
.ssh_exception
.SSHException
as e
:
1310 image_status
='ERROR'
1311 error_text
= e
.args
[0]
1312 print self
.name
, "': create_image(",server_id
,") ssh Exception:", error_text
1313 if "SSH session not active" in error_text
and retry
==0:
1315 except Exception as e
:
1316 image_status
='ERROR'
1318 print self
.name
, "': create_image(",server_id
,") Exception:", error_text
1320 #TODO insert a last_error at database
1321 self
.db_lock
.acquire()
1322 self
.db
.update_rows('images', {'status':image_status
, 'progress': 100, 'path':file_dst
},
1323 {'uuid':req
['new_image']['uuid']}, log
=True)
1324 self
.db_lock
.release()
1326 def edit_iface(self
, port_id
, old_net
, new_net
):
1327 #This action imply remove and insert interface to put proper parameters
1332 self
.db_lock
.acquire()
1333 r
,c
= self
.db
.get_table(FROM
='ports as p join resources_port as rp on p.uuid=rp.port_id',
1334 WHERE
={'port_id': port_id
})
1335 self
.db_lock
.release()
1337 print self
.name
, ": edit_iface(",port_id
,") DDBB error:", c
1340 print self
.name
, ": edit_iface(",port_id
,") por not found"
1343 if port
["model"]!="VF":
1344 print self
.name
, ": edit_iface(",port_id
,") ERROR model must be VF"
1346 #create xml detach file
1349 xml
.append("<interface type='hostdev' managed='yes'>")
1350 xml
.append(" <mac address='" +port
['mac']+ "'/>")
1351 xml
.append(" <source>"+ self
.pci2xml(port
['pci'])+"\n </source>")
1352 xml
.append('</interface>')
1357 conn
= lvirt_module
.open("qemu+ssh://"+self
.user
+"@"+self
.host
+"/system")
1358 dom
= conn
.lookupByUUIDString(port
["instance_id"])
1361 print self
.name
, ": edit_iface detaching SRIOV interface", text
1362 dom
.detachDeviceFlags(text
, flags
=lvirt_module
.VIR_DOMAIN_AFFECT_LIVE
)
1364 xml
[-1] =" <vlan> <tag id='" + str(port
['vlan']) + "'/> </vlan>"
1366 xml
.append(self
.pci2xml(port
.get('vpci',None)) )
1367 xml
.append('</interface>')
1369 print self
.name
, ": edit_iface attaching SRIOV interface", text
1370 dom
.attachDeviceFlags(text
, flags
=lvirt_module
.VIR_DOMAIN_AFFECT_LIVE
)
1372 except lvirt_module
.libvirtError
as e
:
1373 text
= e
.get_error_message()
1374 print self
.name
, ": edit_iface(",port
["instance_id"],") libvirt exception:", text
1377 if conn
is not None: conn
.close
1380 def create_server(server
, db
, db_lock
, only_of_ports
):
1387 # host_id = server.get('host_id', None)
1388 extended
= server
.get('extended', None)
1390 # print '----------------------'
1391 # print json.dumps(extended, indent=4)
1394 requirements
['numa']={'memory':0, 'proc_req_type': 'threads', 'proc_req_nb':0, 'port_list':[], 'sriov_list':[]}
1395 requirements
['ram'] = server
['flavor'].get('ram', 0)
1396 if requirements
['ram']== None:
1397 requirements
['ram'] = 0
1398 requirements
['vcpus'] = server
['flavor'].get('vcpus', 0)
1399 if requirements
['vcpus']== None:
1400 requirements
['vcpus'] = 0
1401 #If extended is not defined get requirements from flavor
1402 if extended
is None:
1403 #If extended is defined in flavor convert to dictionary and use it
1404 if 'extended' in server
['flavor'] and server
['flavor']['extended'] != None:
1405 json_acceptable_string
= server
['flavor']['extended'].replace("'", "\"")
1406 extended
= json
.loads(json_acceptable_string
)
1409 #print json.dumps(extended, indent=4)
1411 #For simplicity only one numa VM are supported in the initial implementation
1412 if extended
!= None:
1413 numas
= extended
.get('numas', [])
1415 return (-2, "Multi-NUMA VMs are not supported yet")
1417 # return (-1, "At least one numa must be specified")
1419 #a for loop is used in order to be ready to multi-NUMA VMs
1423 numa_req
['memory'] = numa
.get('memory', 0)
1425 numa_req
['proc_req_nb'] = numa
['cores'] #number of cores or threads to be reserved
1426 numa_req
['proc_req_type'] = 'cores' #indicates whether cores or threads must be reserved
1427 numa_req
['proc_req_list'] = numa
.get('cores-id', None) #list of ids to be assigned to the cores or threads
1428 elif 'paired-threads' in numa
:
1429 numa_req
['proc_req_nb'] = numa
['paired-threads']
1430 numa_req
['proc_req_type'] = 'paired-threads'
1431 numa_req
['proc_req_list'] = numa
.get('paired-threads-id', None)
1432 elif 'threads' in numa
:
1433 numa_req
['proc_req_nb'] = numa
['threads']
1434 numa_req
['proc_req_type'] = 'threads'
1435 numa_req
['proc_req_list'] = numa
.get('threads-id', None)
1437 numa_req
['proc_req_nb'] = 0 # by default
1438 numa_req
['proc_req_type'] = 'threads'
1442 #Generate a list of sriov and another for physical interfaces
1443 interfaces
= numa
.get('interfaces', [])
1446 for iface
in interfaces
:
1447 iface
['bandwidth'] = int(iface
['bandwidth'])
1448 if iface
['dedicated'][:3]=='yes':
1449 port_list
.append(iface
)
1451 sriov_list
.append(iface
)
1453 #Save lists ordered from more restrictive to less bw requirements
1454 numa_req
['sriov_list'] = sorted(sriov_list
, key
=lambda k
: k
['bandwidth'], reverse
=True)
1455 numa_req
['port_list'] = sorted(port_list
, key
=lambda k
: k
['bandwidth'], reverse
=True)
1458 request
.append(numa_req
)
1460 # print "----------\n"+json.dumps(request[0], indent=4)
1461 # print '----------\n\n'
1463 #Search in db for an appropriate numa for each requested numa
1464 #at the moment multi-NUMA VMs are not supported
1466 requirements
['numa'].update(request
[0])
1467 if requirements
['numa']['memory']>0:
1468 requirements
['ram']=0 #By the moment I make incompatible ask for both Huge and non huge pages memory
1469 elif requirements
['ram']==0:
1470 return (-1, "Memory information not set neither at extended field not at ram")
1471 if requirements
['numa']['proc_req_nb']>0:
1472 requirements
['vcpus']=0 #By the moment I make incompatible ask for both Isolated and non isolated cpus
1473 elif requirements
['vcpus']==0:
1474 return (-1, "Processor information not set neither at extended field not at vcpus")
1478 result
, content
= db
.get_numas(requirements
, server
.get('host_id', None), only_of_ports
)
1482 return (-1, content
)
1484 numa_id
= content
['numa_id']
1485 host_id
= content
['host_id']
1487 #obtain threads_id and calculate pinning
1490 if requirements
['numa']['proc_req_nb']>0:
1492 result
, content
= db
.get_table(FROM
='resources_core',
1493 SELECT
=('id','core_id','thread_id'),
1494 WHERE
={'numa_id':numa_id
,'instance_id': None, 'status':'ok'} )
1500 #convert rows to a dictionary indexed by core_id
1503 if not row
['core_id'] in cores_dict
:
1504 cores_dict
[row
['core_id']] = []
1505 cores_dict
[row
['core_id']].append([row
['thread_id'],row
['id']])
1507 #In case full cores are requested
1509 if requirements
['numa']['proc_req_type'] == 'cores':
1510 #Get/create the list of the vcpu_ids
1511 vcpu_id_list
= requirements
['numa']['proc_req_list']
1512 if vcpu_id_list
== None:
1513 vcpu_id_list
= range(0,int(requirements
['numa']['proc_req_nb']))
1515 for threads
in cores_dict
.itervalues():
1517 if len(threads
) != 2:
1520 #set pinning for the first thread
1521 cpu_pinning
.append( [ vcpu_id_list
.pop(0), threads
[0][0], threads
[0][1] ] )
1523 #reserve so it is not used the second thread
1524 reserved_threads
.append(threads
[1][1])
1526 if len(vcpu_id_list
) == 0:
1529 #In case paired threads are requested
1530 elif requirements
['numa']['proc_req_type'] == 'paired-threads':
1532 #Get/create the list of the vcpu_ids
1533 if requirements
['numa']['proc_req_list'] != None:
1535 for pair
in requirements
['numa']['proc_req_list']:
1537 return -1, "Field paired-threads-id not properly specified"
1539 vcpu_id_list
.append(pair
[0])
1540 vcpu_id_list
.append(pair
[1])
1542 vcpu_id_list
= range(0,2*int(requirements
['numa']['proc_req_nb']))
1544 for threads
in cores_dict
.itervalues():
1546 if len(threads
) != 2:
1548 #set pinning for the first thread
1549 cpu_pinning
.append([vcpu_id_list
.pop(0), threads
[0][0], threads
[0][1]])
1551 #set pinning for the second thread
1552 cpu_pinning
.append([vcpu_id_list
.pop(0), threads
[1][0], threads
[1][1]])
1554 if len(vcpu_id_list
) == 0:
1557 #In case normal threads are requested
1558 elif requirements
['numa']['proc_req_type'] == 'threads':
1559 #Get/create the list of the vcpu_ids
1560 vcpu_id_list
= requirements
['numa']['proc_req_list']
1561 if vcpu_id_list
== None:
1562 vcpu_id_list
= range(0,int(requirements
['numa']['proc_req_nb']))
1564 for threads_index
in sorted(cores_dict
, key
=lambda k
: len(cores_dict
[k
])):
1565 threads
= cores_dict
[threads_index
]
1566 #set pinning for the first thread
1567 cpu_pinning
.append([vcpu_id_list
.pop(0), threads
[0][0], threads
[0][1]])
1569 #if exists, set pinning for the second thread
1570 if len(threads
) == 2 and len(vcpu_id_list
) != 0:
1571 cpu_pinning
.append([vcpu_id_list
.pop(0), threads
[1][0], threads
[1][1]])
1573 if len(vcpu_id_list
) == 0:
1576 #Get the source pci addresses for the selected numa
1577 used_sriov_ports
= []
1578 for port
in requirements
['numa']['sriov_list']:
1580 result
, content
= db
.get_table(FROM
='resources_port', SELECT
=('id', 'pci', 'mac'),WHERE
={'numa_id':numa_id
,'root_id': port
['port_id'], 'port_id': None, 'Mbps_used': 0} )
1586 if row
['id'] in used_sriov_ports
or row
['id']==port
['port_id']:
1588 port
['pci'] = row
['pci']
1589 if 'mac_address' not in port
:
1590 port
['mac_address'] = row
['mac']
1592 port
['port_id']=row
['id']
1593 port
['Mbps_used'] = port
['bandwidth']
1594 used_sriov_ports
.append(row
['id'])
1597 for port
in requirements
['numa']['port_list']:
1598 port
['Mbps_used'] = None
1599 if port
['dedicated'] != "yes:sriov":
1600 port
['mac_address'] = port
['mac']
1604 result
, content
= db
.get_table(FROM
='resources_port', SELECT
=('id', 'pci', 'mac', 'Mbps'),WHERE
={'numa_id':numa_id
,'root_id': port
['port_id'], 'port_id': None, 'Mbps_used': 0} )
1609 port
['Mbps_used'] = content
[0]['Mbps']
1611 if row
['id'] in used_sriov_ports
or row
['id']==port
['port_id']:
1613 port
['pci'] = row
['pci']
1614 if 'mac_address' not in port
:
1615 port
['mac_address'] = row
['mac'] # mac cannot be set to passthrough ports
1617 port
['port_id']=row
['id']
1618 used_sriov_ports
.append(row
['id'])
1621 # print '2. Physical ports assignation:'+json.dumps(requirements['port_list'], indent=4)
1622 # print '2. SR-IOV assignation:'+json.dumps(requirements['sriov_list'], indent=4)
1624 server
['host_id'] = host_id
1627 #Generate dictionary for saving in db the instance resources
1629 resources
['bridged-ifaces'] = []
1632 numa_dict
['interfaces'] = []
1634 numa_dict
['interfaces'] += requirements
['numa']['port_list']
1635 numa_dict
['interfaces'] += requirements
['numa']['sriov_list']
1637 #Check bridge information
1638 unified_dataplane_iface
=[]
1639 unified_dataplane_iface
+= requirements
['numa']['port_list']
1640 unified_dataplane_iface
+= requirements
['numa']['sriov_list']
1642 for control_iface
in server
.get('networks', []):
1643 control_iface
['net_id']=control_iface
.pop('uuid')
1644 #Get the brifge name
1646 result
, content
= db
.get_table(FROM
='nets', SELECT
=('name','type', 'vlan'),WHERE
={'uuid':control_iface
['net_id']} )
1651 return -1, "Error at field netwoks: Not found any network wit uuid %s" % control_iface
['net_id']
1654 if control_iface
.get("type", 'virtual') == 'virtual':
1655 if network
['type']!='bridge_data' and network
['type']!='bridge_man':
1656 return -1, "Error at field netwoks: network uuid %s for control interface is not of type bridge_man or bridge_data" % control_iface
['net_id']
1657 resources
['bridged-ifaces'].append(control_iface
)
1659 if network
['type']!='data' and network
['type']!='ptp':
1660 return -1, "Error at field netwoks: network uuid %s for dataplane interface is not of type data or ptp" % control_iface
['net_id']
1661 #dataplane interface, look for it in the numa tree and asign this network
1663 for dataplane_iface
in numa_dict
['interfaces']:
1664 if dataplane_iface
['name'] == control_iface
.get("name"):
1665 if (dataplane_iface
['dedicated'] == "yes" and control_iface
["type"] != "PF") or \
1666 (dataplane_iface
['dedicated'] == "no" and control_iface
["type"] != "VF") or \
1667 (dataplane_iface
['dedicated'] == "yes:sriov" and control_iface
["type"] != "VFnotShared") :
1668 return -1, "Error at field netwoks: mismatch at interface '%s' from flavor 'dedicated=%s' and networks 'type=%s'" % \
1669 (control_iface
.get("name"), dataplane_iface
['dedicated'], control_iface
["type"])
1670 dataplane_iface
['uuid'] = control_iface
['net_id']
1671 if dataplane_iface
['dedicated'] == "no":
1672 dataplane_iface
['vlan'] = network
['vlan']
1673 if dataplane_iface
['dedicated'] != "yes" and control_iface
.get("mac_address"):
1674 dataplane_iface
['mac_address'] = control_iface
.get("mac_address")
1675 if control_iface
.get("vpci"):
1676 dataplane_iface
['vpci'] = control_iface
.get("vpci")
1680 return -1, "Error at field netwoks: interface name %s from network not found at flavor" % control_iface
.get("name")
1682 resources
['host_id'] = host_id
1683 resources
['image_id'] = server
['image_id']
1684 resources
['flavor_id'] = server
['flavor_id']
1685 resources
['tenant_id'] = server
['tenant_id']
1686 resources
['ram'] = requirements
['ram']
1687 resources
['vcpus'] = requirements
['vcpus']
1688 resources
['status'] = 'CREATING'
1690 if 'description' in server
: resources
['description'] = server
['description']
1691 if 'name' in server
: resources
['name'] = server
['name']
1693 resources
['extended'] = {} #optional
1694 resources
['extended']['numas'] = []
1695 numa_dict
['numa_id'] = numa_id
1696 numa_dict
['memory'] = requirements
['numa']['memory']
1697 numa_dict
['cores'] = []
1699 for core
in cpu_pinning
:
1700 numa_dict
['cores'].append({'id': core
[2], 'vthread': core
[0], 'paired': paired
})
1701 for core
in reserved_threads
:
1702 numa_dict
['cores'].append({'id': core
})
1703 resources
['extended']['numas'].append(numa_dict
)
1704 if extended
!=None and 'devices' in extended
: #TODO allow extra devices without numa
1705 resources
['extended']['devices'] = extended
['devices']
1708 print '===================================={'
1709 print json
.dumps(resources
, indent
=4)
1710 print '====================================}'