1 # -*- coding: utf-8 -*-
4 # Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
5 # This file is part of openvim
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
12 # http://www.apache.org/licenses/LICENSE-2.0
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact with: nfvlabs@tid.es
25 This is thread that interact with the host and the libvirt to manage VM
26 One thread will be launched per host
28 __author__
="Pablo Montes, Alfonso Tierno"
29 __date__
="$10-jul-2014 12:07:15$"
38 from jsonschema
import validate
as js_v
, exceptions
as js_e
40 from vim_schema
import localinfo_schema
, hostinfo_schema
42 #from logging import Logger
43 #import auxiliary_functions as af
45 #TODO: insert a logging system
47 class host_thread(threading
.Thread
):
48 def __init__(self
, name
, host
, user
, db
, db_lock
, test
, image_path
, host_id
, version
, develop_mode
, develop_bridge_iface
):
53 'host','user': host ip or name to manage and user
54 'db', 'db_lock': database class and lock to use it in exclusion
56 threading
.Thread
.__init
__(self
)
61 self
.db_lock
= db_lock
63 self
.develop_mode
= develop_mode
64 self
.develop_bridge_iface
= develop_bridge_iface
65 self
.image_path
= image_path
66 self
.host_id
= host_id
67 self
.version
= version
72 self
.server_status
= {} #dictionary with pairs server_uuid:server_status
73 self
.pending_terminate_server
=[] #list with pairs (time,server_uuid) time to send a terminate for a server being destroyed
74 self
.next_update_server_status
= 0 #time when must be check servers status
78 self
.queueLock
= threading
.Lock()
79 self
.taskQueue
= Queue
.Queue(2000)
81 def ssh_connect(self
):
84 self
.ssh_conn
= paramiko
.SSHClient()
85 self
.ssh_conn
.set_missing_host_key_policy(paramiko
.AutoAddPolicy())
86 self
.ssh_conn
.load_system_host_keys()
87 self
.ssh_conn
.connect(self
.host
, username
=self
.user
, timeout
=10) #, None)
88 except paramiko
.ssh_exception
.SSHException
as e
:
90 print self
.name
, ": ssh_connect ssh Exception:", text
92 def load_localinfo(self
):
98 command
= 'mkdir -p ' + self
.image_path
99 #print self.name, ': command:', command
100 (_
, stdout
, stderr
) = self
.ssh_conn
.exec_command(command
)
101 content
= stderr
.read()
103 print self
.name
, ': command:', command
, "stderr:", content
105 command
= 'cat ' + self
.image_path
+ '/.openvim.yaml'
106 #print self.name, ': command:', command
107 (_
, stdout
, stderr
) = self
.ssh_conn
.exec_command(command
)
108 content
= stdout
.read()
109 if len(content
) == 0:
110 print self
.name
, ': command:', command
, "stderr:", stderr
.read()
111 raise paramiko
.ssh_exception
.SSHException("Error empty file ")
112 self
.localinfo
= yaml
.load(content
)
113 js_v(self
.localinfo
, localinfo_schema
)
114 self
.localinfo_dirty
=False
115 if 'server_files' not in self
.localinfo
:
116 self
.localinfo
['server_files'] = {}
117 print self
.name
, ': localinfo load from host'
120 except paramiko
.ssh_exception
.SSHException
as e
:
122 print self
.name
, ": load_localinfo ssh Exception:", text
123 except libvirt
.libvirtError
as e
:
124 text
= e
.get_error_message()
125 print self
.name
, ": load_localinfo libvirt Exception:", text
126 except yaml
.YAMLError
as exc
:
128 if hasattr(exc
, 'problem_mark'):
129 mark
= exc
.problem_mark
130 text
= " at position: (%s:%s)" % (mark
.line
+1, mark
.column
+1)
131 print self
.name
, ": load_localinfo yaml format Exception", text
132 except js_e
.ValidationError
as e
:
134 if len(e
.path
)>0: text
=" at '" + ":".join(map(str, e
.path
))+"'"
135 print self
.name
, ": load_localinfo format Exception:", text
, e
.message
136 except Exception as e
:
138 print self
.name
, ": load_localinfo Exception:", text
140 #not loaded, insert a default data and force saving by activating dirty flag
141 self
.localinfo
= {'files':{}, 'server_files':{} }
142 #self.localinfo_dirty=True
143 self
.localinfo_dirty
=False
145 def load_hostinfo(self
):
153 command
= 'cat ' + self
.image_path
+ '/hostinfo.yaml'
154 #print self.name, ': command:', command
155 (_
, stdout
, stderr
) = self
.ssh_conn
.exec_command(command
)
156 content
= stdout
.read()
157 if len(content
) == 0:
158 print self
.name
, ': command:', command
, "stderr:", stderr
.read()
159 raise paramiko
.ssh_exception
.SSHException("Error empty file ")
160 self
.hostinfo
= yaml
.load(content
)
161 js_v(self
.hostinfo
, hostinfo_schema
)
162 print self
.name
, ': hostlinfo load from host', self
.hostinfo
165 except paramiko
.ssh_exception
.SSHException
as e
:
167 print self
.name
, ": load_hostinfo ssh Exception:", text
168 except libvirt
.libvirtError
as e
:
169 text
= e
.get_error_message()
170 print self
.name
, ": load_hostinfo libvirt Exception:", text
171 except yaml
.YAMLError
as exc
:
173 if hasattr(exc
, 'problem_mark'):
174 mark
= exc
.problem_mark
175 text
= " at position: (%s:%s)" % (mark
.line
+1, mark
.column
+1)
176 print self
.name
, ": load_hostinfo yaml format Exception", text
177 except js_e
.ValidationError
as e
:
179 if len(e
.path
)>0: text
=" at '" + ":".join(map(str, e
.path
))+"'"
180 print self
.name
, ": load_hostinfo format Exception:", text
, e
.message
181 except Exception as e
:
183 print self
.name
, ": load_hostinfo Exception:", text
185 #not loaded, insert a default data
188 def save_localinfo(self
, tries
=3):
190 self
.localinfo_dirty
= False
197 command
= 'cat > ' + self
.image_path
+ '/.openvim.yaml'
198 print self
.name
, ': command:', command
199 (stdin
, _
, _
) = self
.ssh_conn
.exec_command(command
)
200 yaml
.safe_dump(self
.localinfo
, stdin
, explicit_start
=True, indent
=4, default_flow_style
=False, tags
=False, encoding
='utf-8', allow_unicode
=True)
201 self
.localinfo_dirty
= False
204 except paramiko
.ssh_exception
.SSHException
as e
:
206 print self
.name
, ": save_localinfo ssh Exception:", text
207 if "SSH session not active" in text
:
209 except libvirt
.libvirtError
as e
:
210 text
= e
.get_error_message()
211 print self
.name
, ": save_localinfo libvirt Exception:", text
212 except yaml
.YAMLError
as exc
:
214 if hasattr(exc
, 'problem_mark'):
215 mark
= exc
.problem_mark
216 text
= " at position: (%s:%s)" % (mark
.line
+1, mark
.column
+1)
217 print self
.name
, ": save_localinfo yaml format Exception", text
218 except Exception as e
:
220 print self
.name
, ": save_localinfo Exception:", text
222 def load_servers_from_db(self
):
223 self
.db_lock
.acquire()
224 r
,c
= self
.db
.get_table(SELECT
=('uuid','status', 'image_id'), FROM
='instances', WHERE
={'host_id': self
.host_id
})
225 self
.db_lock
.release()
227 self
.server_status
= {}
229 print self
.name
, ": Error getting data from database:", c
232 self
.server_status
[ server
['uuid'] ] = server
['status']
234 #convert from old version to new one
235 if 'inc_files' in self
.localinfo
and server
['uuid'] in self
.localinfo
['inc_files']:
236 server_files_dict
= {'source file': self
.localinfo
['inc_files'][ server
['uuid'] ] [0], 'file format':'raw' }
237 if server_files_dict
['source file'][-5:] == 'qcow2':
238 server_files_dict
['file format'] = 'qcow2'
240 self
.localinfo
['server_files'][ server
['uuid'] ] = { server
['image_id'] : server_files_dict
}
241 if 'inc_files' in self
.localinfo
:
242 del self
.localinfo
['inc_files']
243 self
.localinfo_dirty
= True
245 def delete_unused_files(self
):
246 '''Compares self.localinfo['server_files'] content with real servers running self.server_status obtained from database
247 Deletes unused entries at self.loacalinfo and the corresponding local files.
248 The only reason for this mismatch is the manual deletion of instances (VM) at database
252 for uuid
,images
in self
.localinfo
['server_files'].items():
253 if uuid
not in self
.server_status
:
254 for localfile
in images
.values():
256 print self
.name
, ": deleting file '%s' of unused server '%s'" %(localfile
['source file'], uuid
)
257 self
.delete_file(localfile
['source file'])
258 except paramiko
.ssh_exception
.SSHException
as e
:
259 print self
.name
, ": Exception deleting file '%s': %s" %(localfile
['source file'], str(e
))
260 del self
.localinfo
['server_files'][uuid
]
261 self
.localinfo_dirty
= True
263 def insert_task(self
, task
, *aditional
):
265 self
.queueLock
.acquire()
266 task
= self
.taskQueue
.put( (task
,) + aditional
, timeout
=5)
267 self
.queueLock
.release()
270 return -1, "timeout inserting a task over host " + self
.name
274 self
.load_localinfo()
276 self
.load_servers_from_db()
277 self
.delete_unused_files()
279 self
.queueLock
.acquire()
280 if not self
.taskQueue
.empty():
281 task
= self
.taskQueue
.get()
284 self
.queueLock
.release()
288 if self
.localinfo_dirty
:
289 self
.save_localinfo()
290 elif self
.next_update_server_status
< now
:
291 self
.update_servers_status()
292 self
.next_update_server_status
= now
+ 5
293 elif len(self
.pending_terminate_server
)>0 and self
.pending_terminate_server
[0][0]<now
:
294 self
.server_forceoff()
299 if task
[0] == 'instance':
300 print self
.name
, ": processing task instance", task
[1]['action']
304 r
=self
.action_on_server(task
[1], retry
==2)
307 elif task
[0] == 'image':
309 elif task
[0] == 'exit':
310 print self
.name
, ": processing task exit"
313 elif task
[0] == 'reload':
314 print self
.name
, ": processing task reload terminating and relaunching"
317 elif task
[0] == 'edit-iface':
318 print self
.name
, ": processing task edit-iface port=%s, old_net=%s, new_net=%s" % (task
[1], task
[2], task
[3])
319 self
.edit_iface(task
[1], task
[2], task
[3])
320 elif task
[0] == 'restore-iface':
321 print self
.name
, ": processing task restore-iface %s mac=%s" % (task
[1], task
[2])
322 self
.restore_iface(task
[1], task
[2])
324 print self
.name
, ": unknown task", task
326 def server_forceoff(self
, wait_until_finished
=False):
327 while len(self
.pending_terminate_server
)>0:
329 if self
.pending_terminate_server
[0][0]>now
:
330 if wait_until_finished
:
335 req
={'uuid':self
.pending_terminate_server
[0][1],
336 'action':{'terminate':'force'},
339 self
.action_on_server(req
)
340 self
.pending_terminate_server
.pop(0)
344 self
.server_forceoff(True)
345 if self
.localinfo_dirty
:
346 self
.save_localinfo()
348 self
.ssh_conn
.close()
349 except Exception as e
:
351 print self
.name
, ": terminate Exception:", text
352 print self
.name
, ": exit from host_thread"
354 def get_local_iface_name(self
, generic_name
):
355 if self
.hostinfo
!= None and "iface_names" in self
.hostinfo
and generic_name
in self
.hostinfo
["iface_names"]:
356 return self
.hostinfo
["iface_names"][generic_name
]
359 def create_xml_server(self
, server
, dev_list
, server_metadata
={}):
360 """Function that implements the generation of the VM XML definition.
361 Additional devices are in dev_list list
362 The main disk is upon dev_list[0]"""
364 #get if operating system is Windows
366 os_type
= server_metadata
.get('os_type', None)
367 if os_type
== None and 'metadata' in dev_list
[0]:
368 os_type
= dev_list
[0]['metadata'].get('os_type', None)
369 if os_type
!= None and os_type
.lower() == "windows":
371 #get type of hard disk bus
372 bus_ide
= True if windows_os
else False
373 bus
= server_metadata
.get('bus', None)
374 if bus
== None and 'metadata' in dev_list
[0]:
375 bus
= dev_list
[0]['metadata'].get('bus', None)
377 bus_ide
= True if bus
=='ide' else False
381 text
= "<domain type='kvm'>"
383 topo
= server_metadata
.get('topology', None)
384 if topo
== None and 'metadata' in dev_list
[0]:
385 topo
= dev_list
[0]['metadata'].get('topology', None)
387 name
= server
.get('name','') + "_" + server
['uuid']
388 name
= name
[:58] #qemu impose a length limit of 59 chars or not start. Using 58
389 text
+= self
.inc_tab() + "<name>" + name
+ "</name>"
391 text
+= self
.tab() + "<uuid>" + server
['uuid'] + "</uuid>"
394 if 'extended' in server
and server
['extended']!=None and 'numas' in server
['extended']:
395 numa
= server
['extended']['numas'][0]
398 memory
= int(numa
.get('memory',0))*1024*1024 #in KiB
400 memory
= int(server
['ram'])*1024;
402 if not self
.develop_mode
:
405 return -1, 'No memory assigned to instance'
407 text
+= self
.tab() + "<memory unit='KiB'>" +memory
+"</memory>"
408 text
+= self
.tab() + "<currentMemory unit='KiB'>" +memory
+ "</currentMemory>"
410 text
+= self
.tab()+'<memoryBacking>'+ \
411 self
.inc_tab() + '<hugepages/>'+ \
412 self
.dec_tab()+ '</memoryBacking>'
415 use_cpu_pinning
=False
416 vcpus
= int(server
.get("vcpus",0))
418 if 'cores-source' in numa
:
420 for index
in range(0, len(numa
['cores-source'])):
421 cpu_pinning
.append( [ numa
['cores-id'][index
], numa
['cores-source'][index
] ] )
423 if 'threads-source' in numa
:
425 for index
in range(0, len(numa
['threads-source'])):
426 cpu_pinning
.append( [ numa
['threads-id'][index
], numa
['threads-source'][index
] ] )
428 if 'paired-threads-source' in numa
:
430 for index
in range(0, len(numa
['paired-threads-source'])):
431 cpu_pinning
.append( [numa
['paired-threads-id'][index
][0], numa
['paired-threads-source'][index
][0] ] )
432 cpu_pinning
.append( [numa
['paired-threads-id'][index
][1], numa
['paired-threads-source'][index
][1] ] )
435 if use_cpu_pinning
and not self
.develop_mode
:
436 text
+= self
.tab()+"<vcpu placement='static'>" +str(len(cpu_pinning
)) +"</vcpu>" + \
437 self
.tab()+'<cputune>'
439 for i
in range(0, len(cpu_pinning
)):
440 text
+= self
.tab() + "<vcpupin vcpu='" +str(cpu_pinning
[i
][0])+ "' cpuset='" +str(cpu_pinning
[i
][1]) +"'/>"
441 text
+= self
.dec_tab()+'</cputune>'+ \
442 self
.tab() + '<numatune>' +\
443 self
.inc_tab() + "<memory mode='strict' nodeset='" +str(numa
['source'])+ "'/>" +\
444 self
.dec_tab() + '</numatune>'
447 return -1, "Instance without number of cpus"
448 text
+= self
.tab()+"<vcpu>" + str(vcpus
) + "</vcpu>"
453 if dev
['type']=='cdrom' :
456 text
+= self
.tab()+ '<os>' + \
457 self
.inc_tab() + "<type arch='x86_64' machine='pc'>hvm</type>"
459 text
+= self
.tab() + "<boot dev='cdrom'/>"
460 text
+= self
.tab() + "<boot dev='hd'/>" + \
461 self
.dec_tab()+'</os>'
463 text
+= self
.tab()+'<features>'+\
464 self
.inc_tab()+'<acpi/>' +\
465 self
.tab()+'<apic/>' +\
466 self
.tab()+'<pae/>'+ \
467 self
.dec_tab() +'</features>'
468 if windows_os
or topo
=="oneSocket":
469 text
+= self
.tab() + "<cpu mode='host-model'> <topology sockets='1' cores='%d' threads='1' /> </cpu>"% vcpus
471 text
+= self
.tab() + "<cpu mode='host-model'></cpu>"
472 text
+= self
.tab() + "<clock offset='utc'/>" +\
473 self
.tab() + "<on_poweroff>preserve</on_poweroff>" + \
474 self
.tab() + "<on_reboot>restart</on_reboot>" + \
475 self
.tab() + "<on_crash>restart</on_crash>"
476 text
+= self
.tab() + "<devices>" + \
477 self
.inc_tab() + "<emulator>/usr/libexec/qemu-kvm</emulator>" + \
478 self
.tab() + "<serial type='pty'>" +\
479 self
.inc_tab() + "<target port='0'/>" + \
480 self
.dec_tab() + "</serial>" +\
481 self
.tab() + "<console type='pty'>" + \
482 self
.inc_tab()+ "<target type='serial' port='0'/>" + \
483 self
.dec_tab()+'</console>'
485 text
+= self
.tab() + "<controller type='usb' index='0'/>" + \
486 self
.tab() + "<controller type='ide' index='0'/>" + \
487 self
.tab() + "<input type='mouse' bus='ps2'/>" + \
488 self
.tab() + "<sound model='ich6'/>" + \
489 self
.tab() + "<video>" + \
490 self
.inc_tab() + "<model type='cirrus' vram='9216' heads='1'/>" + \
491 self
.dec_tab() + "</video>" + \
492 self
.tab() + "<memballoon model='virtio'/>" + \
493 self
.tab() + "<input type='tablet' bus='usb'/>" #TODO revisar
495 #> self.tab()+'<alias name=\'hostdev0\'/>\n' +\
496 #> self.dec_tab()+'</hostdev>\n' +\
497 #> self.tab()+'<input type=\'tablet\' bus=\'usb\'/>\n'
499 text
+= self
.tab() + "<graphics type='vnc' port='-1' autoport='yes'/>"
501 #If image contains 'GRAPH' include graphics
502 #if 'GRAPH' in image:
503 text
+= self
.tab() + "<graphics type='vnc' port='-1' autoport='yes' listen='0.0.0.0'>" +\
504 self
.inc_tab() + "<listen type='address' address='0.0.0.0'/>" +\
505 self
.dec_tab() + "</graphics>"
509 bus_ide_dev
= bus_ide
510 if dev
['type']=='cdrom' or dev
['type']=='disk':
511 if dev
['type']=='cdrom':
513 text
+= self
.tab() + "<disk type='file' device='"+dev
['type']+"'>"
514 if 'file format' in dev
:
515 text
+= self
.inc_tab() + "<driver name='qemu' type='" +dev
['file format']+ "' cache='writethrough'/>"
516 if 'source file' in dev
:
517 text
+= self
.tab() + "<source file='" +dev
['source file']+ "'/>"
518 #elif v['type'] == 'block':
519 # text += self.tab() + "<source dev='" + v['source'] + "'/>"
521 # return -1, 'Unknown disk type ' + v['type']
522 vpci
= dev
.get('vpci',None)
524 vpci
= dev
['metadata'].get('vpci',None)
525 text
+= self
.pci2xml(vpci
)
528 text
+= self
.tab() + "<target dev='hd" +vd_index
+ "' bus='ide'/>" #TODO allows several type of disks
530 text
+= self
.tab() + "<target dev='vd" +vd_index
+ "' bus='virtio'/>"
531 text
+= self
.dec_tab() + '</disk>'
532 vd_index
= chr(ord(vd_index
)+1)
533 elif dev
['type']=='xml':
534 dev_text
= dev
['xml']
536 dev_text
= dev_text
.replace('__vpci__', dev
['vpci'])
537 if 'source file' in dev
:
538 dev_text
= dev_text
.replace('__file__', dev
['source file'])
539 if 'file format' in dev
:
540 dev_text
= dev_text
.replace('__format__', dev
['source file'])
541 if '__dev__' in dev_text
:
542 dev_text
= dev_text
.replace('__dev__', vd_index
)
543 vd_index
= chr(ord(vd_index
)+1)
546 return -1, 'Unknown device type ' + dev
['type']
549 bridge_interfaces
= server
.get('networks', [])
550 for v
in bridge_interfaces
:
552 self
.db_lock
.acquire()
553 result
, content
= self
.db
.get_table(FROM
='nets', SELECT
=('provider',),WHERE
={'uuid':v
['net_id']} )
554 self
.db_lock
.release()
556 print "create_xml_server ERROR getting nets",result
, content
558 #ALF: Allow by the moment the 'default' bridge net because is confortable for provide internet to VM
559 #I know it is not secure
560 #for v in sorted(desc['network interfaces'].itervalues()):
561 model
= v
.get("model", None)
562 if content
[0]['provider']=='default':
563 text
+= self
.tab() + "<interface type='network'>" + \
564 self
.inc_tab() + "<source network='" +content
[0]['provider']+ "'/>"
565 elif content
[0]['provider'][0:7]=='macvtap':
566 text
+= self
.tab()+"<interface type='direct'>" + \
567 self
.inc_tab() + "<source dev='" + self
.get_local_iface_name(content
[0]['provider'][8:]) + "' mode='bridge'/>" + \
568 self
.tab() + "<target dev='macvtap0'/>"
570 text
+= self
.tab() + "<alias name='net" + str(net_nb
) + "'/>"
573 elif content
[0]['provider'][0:6]=='bridge':
574 text
+= self
.tab() + "<interface type='bridge'>" + \
575 self
.inc_tab()+"<source bridge='" +self
.get_local_iface_name(content
[0]['provider'][7:])+ "'/>"
577 text
+= self
.tab() + "<target dev='vnet" + str(net_nb
)+ "'/>" +\
578 self
.tab() + "<alias name='net" + str(net_nb
)+ "'/>"
582 return -1, 'Unknown Bridge net provider ' + content
[0]['provider']
584 text
+= self
.tab() + "<model type='" +model
+ "'/>"
585 if v
.get('mac_address', None) != None:
586 text
+= self
.tab() +"<mac address='" +v
['mac_address']+ "'/>"
587 text
+= self
.pci2xml(v
.get('vpci',None))
588 text
+= self
.dec_tab()+'</interface>'
592 interfaces
= numa
.get('interfaces', [])
596 if self
.develop_mode
: #map these interfaces to bridges
597 text
+= self
.tab() + "<interface type='bridge'>" + \
598 self
.inc_tab()+"<source bridge='" +self
.develop_bridge_iface
+ "'/>"
600 text
+= self
.tab() + "<target dev='vnet" + str(net_nb
)+ "'/>" +\
601 self
.tab() + "<alias name='net" + str(net_nb
)+ "'/>"
603 text
+= self
.tab() + "<model type='e1000'/>" #e1000 is more probable to be supported than 'virtio'
604 if v
.get('mac_address', None) != None:
605 text
+= self
.tab() +"<mac address='" +v
['mac_address']+ "'/>"
606 text
+= self
.pci2xml(v
.get('vpci',None))
607 text
+= self
.dec_tab()+'</interface>'
610 if v
['dedicated'] == 'yes': #passthrought
611 text
+= self
.tab() + "<hostdev mode='subsystem' type='pci' managed='yes'>" + \
612 self
.inc_tab() + "<source>"
614 text
+= self
.pci2xml(v
['source'])
615 text
+= self
.dec_tab()+'</source>'
616 text
+= self
.pci2xml(v
.get('vpci',None))
618 text
+= self
.tab() + "<alias name='hostdev" + str(net_nb
) + "'/>"
619 text
+= self
.dec_tab()+'</hostdev>'
621 else: #sriov_interfaces
622 #skip not connected interfaces
623 if v
.get("net_id") == None:
625 text
+= self
.tab() + "<interface type='hostdev' managed='yes'>"
627 if v
.get('mac_address', None) != None:
628 text
+= self
.tab() + "<mac address='" +v
['mac_address']+ "'/>"
629 text
+= self
.tab()+'<source>'
631 text
+= self
.pci2xml(v
['source'])
632 text
+= self
.dec_tab()+'</source>'
633 if v
.get('vlan',None) != None:
634 text
+= self
.tab() + "<vlan> <tag id='" + str(v
['vlan']) + "'/> </vlan>"
635 text
+= self
.pci2xml(v
.get('vpci',None))
637 text
+= self
.tab() + "<alias name='hostdev" + str(net_nb
) + "'/>"
638 text
+= self
.dec_tab()+'</interface>'
641 text
+= self
.dec_tab()+'</devices>'+\
642 self
.dec_tab()+'</domain>'
645 def pci2xml(self
, pci
):
646 '''from a pci format text XXXX:XX:XX.X generates the xml content of <address>
647 alows an empty pci text'''
650 first_part
= pci
.split(':')
651 second_part
= first_part
[2].split('.')
652 return self
.tab() + "<address type='pci' domain='0x" + first_part
[0] + \
653 "' bus='0x" + first_part
[1] + "' slot='0x" + second_part
[0] + \
654 "' function='0x" + second_part
[1] + "'/>"
657 """Return indentation according to xml_level"""
658 return "\n" + (' '*self
.xml_level
)
661 """Increment and return indentation according to xml_level"""
666 """Decrement and return indentation according to xml_level"""
670 def get_file_info(self
, path
):
671 command
= 'ls -lL --time-style=+%Y-%m-%dT%H:%M:%S ' + path
672 print self
.name
, ': command:', command
673 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
674 content
= stdout
.read()
675 if len(content
) == 0:
676 return None # file does not exist
678 return content
.split(" ") #(permission, 1, owner, group, size, date, file)
680 def qemu_get_info(self
, path
):
681 command
= 'qemu-img info ' + path
682 print self
.name
, ': command:', command
683 (_
, stdout
, stderr
) = self
.ssh_conn
.exec_command(command
)
684 content
= stdout
.read()
685 if len(content
) == 0:
686 error
= stderr
.read()
687 print self
.name
, ": get_qemu_info error ", error
688 raise paramiko
.ssh_exception
.SSHException("Error getting qemu_info: " + error
)
691 return yaml
.load(content
)
692 except yaml
.YAMLError
as exc
:
694 if hasattr(exc
, 'problem_mark'):
695 mark
= exc
.problem_mark
696 text
= " at position: (%s:%s)" % (mark
.line
+1, mark
.column
+1)
697 print self
.name
, ": get_qemu_info yaml format Exception", text
698 raise paramiko
.ssh_exception
.SSHException("Error getting qemu_info yaml format" + text
)
700 def qemu_change_backing(self
, inc_file
, new_backing_file
):
701 command
= 'qemu-img rebase -u -b ' + new_backing_file
+ ' ' + inc_file
702 print self
.name
, ': command:', command
703 (_
, _
, stderr
) = self
.ssh_conn
.exec_command(command
)
704 content
= stderr
.read()
705 if len(content
) == 0:
708 print self
.name
, ": qemu_change_backing error: ", content
711 def get_notused_filename(self
, proposed_name
, suffix
=''):
712 '''Look for a non existing file_name in the host
713 proposed_name: proposed file name, includes path
714 suffix: suffix to be added to the name, before the extention
716 extension
= proposed_name
.rfind(".")
717 slash
= proposed_name
.rfind("/")
718 if extension
< 0 or extension
< slash
: # no extension
719 extension
= len(proposed_name
)
720 target_name
= proposed_name
[:extension
] + suffix
+ proposed_name
[extension
:]
721 info
= self
.get_file_info(target_name
)
726 while info
is not None:
727 target_name
= proposed_name
[:extension
] + suffix
+ "-" + str(index
) + proposed_name
[extension
:]
729 info
= self
.get_file_info(target_name
)
732 def get_notused_path(self
, proposed_path
, suffix
=''):
733 '''Look for a non existing path at database for images
734 proposed_path: proposed file name, includes path
735 suffix: suffix to be added to the name, before the extention
737 extension
= proposed_path
.rfind(".")
739 extension
= len(proposed_path
)
741 target_path
= proposed_path
[:extension
] + suffix
+ proposed_path
[extension
:]
744 r
,_
=self
.db
.get_table(FROM
="images",WHERE
={"path":target_path
})
747 target_path
= proposed_path
[:extension
] + suffix
+ "-" + str(index
) + proposed_path
[extension
:]
751 def delete_file(self
, file_name
):
752 command
= 'rm -f '+file_name
753 print self
.name
, ': command:', command
754 (_
, _
, stderr
) = self
.ssh_conn
.exec_command(command
)
755 error_msg
= stderr
.read()
756 if len(error_msg
) > 0:
757 raise paramiko
.ssh_exception
.SSHException("Error deleting file: " + error_msg
)
759 def copy_file(self
, source
, destination
, perserve_time
=True):
760 if source
[0:4]=="http":
761 command
= "wget --no-verbose -O '{dst}' '{src}' 2>'{dst_result}' || cat '{dst_result}' >&2 && rm '{dst_result}'".format(
762 dst
=destination
, src
=source
, dst_result
=destination
+ ".result" )
764 command
= 'cp --no-preserve=mode'
766 command
+= ' --preserve=timestamps'
767 command
+= " '{}' '{}'".format(source
, destination
)
768 print self
.name
, ': command:', command
769 (_
, _
, stderr
) = self
.ssh_conn
.exec_command(command
)
770 error_msg
= stderr
.read()
771 if len(error_msg
) > 0:
772 raise paramiko
.ssh_exception
.SSHException("Error copying image to local host: " + error_msg
)
774 def copy_remote_file(self
, remote_file
, use_incremental
):
775 ''' Copy a file from the repository to local folder and recursively
776 copy the backing files in case the remote file is incremental
777 Read and/or modified self.localinfo['files'] that contain the
778 unmodified copies of images in the local path
780 remote_file: path of remote file
781 use_incremental: None (leave the decision to this function), True, False
783 local_file: name of local file
784 qemu_info: dict with quemu information of local file
785 use_incremental_out: True, False; same as use_incremental, but if None a decision is taken
788 use_incremental_out
= use_incremental
789 new_backing_file
= None
791 file_from_local
= True
793 #in case incremental use is not decided, take the decision depending on the image
794 #avoid the use of incremental if this image is already incremental
795 if remote_file
[0:4] == "http":
796 file_from_local
= False
798 qemu_remote_info
= self
.qemu_get_info(remote_file
)
799 if use_incremental_out
==None:
800 use_incremental_out
= not ( file_from_local
and 'backing file' in qemu_remote_info
)
801 #copy recursivelly the backing files
802 if file_from_local
and 'backing file' in qemu_remote_info
:
803 new_backing_file
, _
, _
= self
.copy_remote_file(qemu_remote_info
['backing file'], True)
805 #check if remote file is present locally
806 if use_incremental_out
and remote_file
in self
.localinfo
['files']:
807 local_file
= self
.localinfo
['files'][remote_file
]
808 local_file_info
= self
.get_file_info(local_file
)
810 remote_file_info
= self
.get_file_info(remote_file
)
811 if local_file_info
== None:
813 elif file_from_local
and (local_file_info
[4]!=remote_file_info
[4] or local_file_info
[5]!=remote_file_info
[5]):
814 #local copy of file not valid because date or size are different.
815 #TODO DELETE local file if this file is not used by any active virtual machine
817 self
.delete_file(local_file
)
818 del self
.localinfo
['files'][remote_file
]
822 else: #check that the local file has the same backing file, or there are not backing at all
823 qemu_info
= self
.qemu_get_info(local_file
)
824 if new_backing_file
!= qemu_info
.get('backing file'):
828 if local_file
== None: #copy the file
829 img_name
= remote_file
.split('/') [-1]
830 img_local
= self
.image_path
+ '/' + img_name
831 local_file
= self
.get_notused_filename(img_local
)
832 self
.copy_file(remote_file
, local_file
, use_incremental_out
)
834 if use_incremental_out
:
835 self
.localinfo
['files'][remote_file
] = local_file
837 self
.qemu_change_backing(local_file
, new_backing_file
)
838 qemu_info
= self
.qemu_get_info(local_file
)
840 return local_file
, qemu_info
, use_incremental_out
842 def launch_server(self
, conn
, server
, rebuild
=False, domain
=None):
844 time
.sleep(random
.randint(20,150)) #sleep random timeto be make it a bit more real
847 server_id
= server
['uuid']
848 paused
= server
.get('paused','no')
850 if domain
!=None and rebuild
==False:
852 #self.server_status[server_id] = 'ACTIVE'
855 self
.db_lock
.acquire()
856 result
, server_data
= self
.db
.get_instance(server_id
)
857 self
.db_lock
.release()
859 print self
.name
, ": launch_server ERROR getting server from DB",result
, server_data
860 return result
, server_data
862 #0: get image metadata
863 server_metadata
= server
.get('metadata', {})
864 use_incremental
= None
866 if "use_incremental" in server_metadata
:
867 use_incremental
= False if server_metadata
["use_incremental"]=="no" else True
869 server_host_files
= self
.localinfo
['server_files'].get( server
['uuid'], {})
871 #delete previous incremental files
872 for file_
in server_host_files
.values():
873 self
.delete_file(file_
['source file'] )
876 #1: obtain aditional devices (disks)
877 #Put as first device the main disk
878 devices
= [ {"type":"disk", "image_id":server
['image_id'], "vpci":server_metadata
.get('vpci', None) } ]
879 if 'extended' in server_data
and server_data
['extended']!=None and "devices" in server_data
['extended']:
880 devices
+= server_data
['extended']['devices']
883 if dev
['image_id'] == None:
886 self
.db_lock
.acquire()
887 result
, content
= self
.db
.get_table(FROM
='images', SELECT
=('path','metadata'),WHERE
={'uuid':dev
['image_id']} )
888 self
.db_lock
.release()
890 error_text
= "ERROR", result
, content
, "when getting image", dev
['image_id']
891 print self
.name
, ": launch_server", error_text
892 return -1, error_text
893 if content
[0]['metadata'] is not None:
894 dev
['metadata'] = json
.loads(content
[0]['metadata'])
898 if dev
['image_id'] in server_host_files
:
899 dev
['source file'] = server_host_files
[ dev
['image_id'] ] ['source file'] #local path
900 dev
['file format'] = server_host_files
[ dev
['image_id'] ] ['file format'] # raw or qcow2
903 #2: copy image to host
904 remote_file
= content
[0]['path']
905 use_incremental_image
= use_incremental
906 if dev
['metadata'].get("use_incremental") == "no":
907 use_incremental_image
= False
908 local_file
, qemu_info
, use_incremental_image
= self
.copy_remote_file(remote_file
, use_incremental_image
)
910 #create incremental image
911 if use_incremental_image
:
912 local_file_inc
= self
.get_notused_filename(local_file
, '.inc')
913 command
= 'qemu-img create -f qcow2 '+local_file_inc
+ ' -o backing_file='+ local_file
914 print 'command:', command
915 (_
, _
, stderr
) = self
.ssh_conn
.exec_command(command
)
916 error_msg
= stderr
.read()
917 if len(error_msg
) > 0:
918 raise paramiko
.ssh_exception
.SSHException("Error creating incremental file: " + error_msg
)
919 local_file
= local_file_inc
920 qemu_info
= {'file format':'qcow2'}
922 server_host_files
[ dev
['image_id'] ] = {'source file': local_file
, 'file format': qemu_info
['file format']}
924 dev
['source file'] = local_file
925 dev
['file format'] = qemu_info
['file format']
927 self
.localinfo
['server_files'][ server
['uuid'] ] = server_host_files
928 self
.localinfo_dirty
= True
931 result
, xml
= self
.create_xml_server(server_data
, devices
, server_metadata
) #local_file
933 print self
.name
, ": create xml server error:", xml
935 print self
.name
, ": create xml:", xml
936 atribute
= libvirt
.VIR_DOMAIN_START_PAUSED
if paused
== "yes" else 0
938 if not rebuild
: #ensures that any pending destroying server is done
939 self
.server_forceoff(True)
940 #print self.name, ": launching instance" #, xml
941 conn
.createXML(xml
, atribute
)
942 #self.server_status[server_id] = 'PAUSED' if paused == "yes" else 'ACTIVE'
946 except paramiko
.ssh_exception
.SSHException
as e
:
948 print self
.name
, ": launch_server(%s) ssh Exception: %s" %(server_id
, text
)
949 if "SSH session not active" in text
:
951 except libvirt
.libvirtError
as e
:
952 text
= e
.get_error_message()
953 print self
.name
, ": launch_server(%s) libvirt Exception: %s" %(server_id
, text
)
954 except Exception as e
:
956 print self
.name
, ": launch_server(%s) Exception: %s" %(server_id
, text
)
959 def update_servers_status(self
):
961 # VIR_DOMAIN_NOSTATE = 0
962 # VIR_DOMAIN_RUNNING = 1
963 # VIR_DOMAIN_BLOCKED = 2
964 # VIR_DOMAIN_PAUSED = 3
965 # VIR_DOMAIN_SHUTDOWN = 4
966 # VIR_DOMAIN_SHUTOFF = 5
967 # VIR_DOMAIN_CRASHED = 6
968 # VIR_DOMAIN_PMSUSPENDED = 7 #TODO suspended
970 if self
.test
or len(self
.server_status
)==0:
974 conn
= libvirt
.open("qemu+ssh://"+self
.user
+"@"+self
.host
+"/system")
975 domains
= conn
.listAllDomains()
977 for domain
in domains
:
978 uuid
= domain
.UUIDString() ;
979 libvirt_status
= domain
.state()
980 #print libvirt_status
981 if libvirt_status
[0] == libvirt
.VIR_DOMAIN_RUNNING
or libvirt_status
[0] == libvirt
.VIR_DOMAIN_SHUTDOWN
:
982 new_status
= "ACTIVE"
983 elif libvirt_status
[0] == libvirt
.VIR_DOMAIN_PAUSED
:
984 new_status
= "PAUSED"
985 elif libvirt_status
[0] == libvirt
.VIR_DOMAIN_SHUTOFF
:
986 new_status
= "INACTIVE"
987 elif libvirt_status
[0] == libvirt
.VIR_DOMAIN_CRASHED
:
991 domain_dict
[uuid
] = new_status
993 except libvirt
.libvirtError
as e
:
994 print self
.name
, ": get_state() Exception '", e
.get_error_message()
997 for server_id
, current_status
in self
.server_status
.iteritems():
999 if server_id
in domain_dict
:
1000 new_status
= domain_dict
[server_id
]
1002 new_status
= "INACTIVE"
1004 if new_status
== None or new_status
== current_status
:
1006 if new_status
== 'INACTIVE' and current_status
== 'ERROR':
1007 continue #keep ERROR status, because obviously this machine is not running
1009 print self
.name
, ": server ", server_id
, "status change from ", current_status
, "to", new_status
1010 STATUS
={'progress':100, 'status':new_status
}
1011 if new_status
== 'ERROR':
1012 STATUS
['last_error'] = 'machine has crashed'
1013 self
.db_lock
.acquire()
1014 r
,_
= self
.db
.update_rows('instances', STATUS
, {'uuid':server_id
}, log
=False)
1015 self
.db_lock
.release()
1017 self
.server_status
[server_id
] = new_status
1019 def action_on_server(self
, req
, last_retry
=True):
1020 '''Perform an action on a req
1022 req: dictionary that contain:
1023 server properties: 'uuid','name','tenant_id','status'
1025 host properties: 'user', 'ip_name'
1026 return (error, text)
1027 0: No error. VM is updated to new state,
1028 -1: Invalid action, as trying to pause a PAUSED VM
1029 -2: Error accessing host
1031 -4: Error at DB access
1032 -5: Error while trying to perform action. VM is updated to ERROR
1034 server_id
= req
['uuid']
1037 old_status
= req
['status']
1041 if 'terminate' in req
['action']:
1042 new_status
= 'deleted'
1043 elif 'shutoff' in req
['action'] or 'shutdown' in req
['action'] or 'forceOff' in req
['action']:
1044 if req
['status']!='ERROR':
1046 new_status
= 'INACTIVE'
1047 elif 'start' in req
['action'] and req
['status']!='ERROR': new_status
= 'ACTIVE'
1048 elif 'resume' in req
['action'] and req
['status']!='ERROR' and req
['status']!='INACTIVE' : new_status
= 'ACTIVE'
1049 elif 'pause' in req
['action'] and req
['status']!='ERROR': new_status
= 'PAUSED'
1050 elif 'reboot' in req
['action'] and req
['status']!='ERROR': new_status
= 'ACTIVE'
1051 elif 'rebuild' in req
['action']:
1052 time
.sleep(random
.randint(20,150))
1053 new_status
= 'ACTIVE'
1054 elif 'createImage' in req
['action']:
1056 self
.create_image(None, req
)
1059 conn
= libvirt
.open("qemu+ssh://"+self
.user
+"@"+self
.host
+"/system")
1061 dom
= conn
.lookupByUUIDString(server_id
)
1062 except libvirt
.libvirtError
as e
:
1063 text
= e
.get_error_message()
1064 if 'LookupByUUIDString' in text
or 'Domain not found' in text
or 'No existe un dominio coincidente' in text
:
1067 print self
.name
, ": action_on_server(",server_id
,") libvirt exception:", text
1070 if 'forceOff' in req
['action']:
1072 print self
.name
, ": action_on_server(",server_id
,") domain not running"
1075 print self
.name
, ": sending DESTROY to server", server_id
1077 except Exception as e
:
1078 if "domain is not running" not in e
.get_error_message():
1079 print self
.name
, ": action_on_server(",server_id
,") Exception while sending force off:", e
.get_error_message()
1080 last_error
= 'action_on_server Exception while destroy: ' + e
.get_error_message()
1081 new_status
= 'ERROR'
1083 elif 'terminate' in req
['action']:
1085 print self
.name
, ": action_on_server(",server_id
,") domain not running"
1086 new_status
= 'deleted'
1089 if req
['action']['terminate'] == 'force':
1090 print self
.name
, ": sending DESTROY to server", server_id
1092 new_status
= 'deleted'
1094 print self
.name
, ": sending SHUTDOWN to server", server_id
1096 self
.pending_terminate_server
.append( (time
.time()+10,server_id
) )
1097 except Exception as e
:
1098 print self
.name
, ": action_on_server(",server_id
,") Exception while destroy:", e
.get_error_message()
1099 last_error
= 'action_on_server Exception while destroy: ' + e
.get_error_message()
1100 new_status
= 'ERROR'
1101 if "domain is not running" in e
.get_error_message():
1104 new_status
= 'deleted'
1106 print self
.name
, ": action_on_server(",server_id
,") Exception while undefine:", e
.get_error_message()
1107 last_error
= 'action_on_server Exception2 while undefine:', e
.get_error_message()
1108 #Exception: 'virDomainDetachDevice() failed'
1109 if new_status
=='deleted':
1110 if server_id
in self
.server_status
:
1111 del self
.server_status
[server_id
]
1112 if req
['uuid'] in self
.localinfo
['server_files']:
1113 for file_
in self
.localinfo
['server_files'][ req
['uuid'] ].values():
1115 self
.delete_file(file_
['source file'])
1118 del self
.localinfo
['server_files'][ req
['uuid'] ]
1119 self
.localinfo_dirty
= True
1121 elif 'shutoff' in req
['action'] or 'shutdown' in req
['action']:
1124 print self
.name
, ": action_on_server(",server_id
,") domain not running"
1127 # new_status = 'INACTIVE'
1128 #TODO: check status for changing at database
1129 except Exception as e
:
1130 new_status
= 'ERROR'
1131 print self
.name
, ": action_on_server(",server_id
,") Exception while shutdown:", e
.get_error_message()
1132 last_error
= 'action_on_server Exception while shutdown: ' + e
.get_error_message()
1134 elif 'rebuild' in req
['action']:
1137 r
= self
.launch_server(conn
, req
, True, None)
1139 new_status
= 'ERROR'
1142 new_status
= 'ACTIVE'
1143 elif 'start' in req
['action']:
1144 #La instancia está sólo en la base de datos pero no en la libvirt. es necesario crearla
1145 rebuild
= True if req
['action']['start']=='rebuild' else False
1146 r
= self
.launch_server(conn
, req
, rebuild
, dom
)
1148 new_status
= 'ERROR'
1151 new_status
= 'ACTIVE'
1153 elif 'resume' in req
['action']:
1159 # new_status = 'ACTIVE'
1160 except Exception as e
:
1161 print self
.name
, ": action_on_server(",server_id
,") Exception while resume:", e
.get_error_message()
1163 elif 'pause' in req
['action']:
1169 # new_status = 'PAUSED'
1170 except Exception as e
:
1171 print self
.name
, ": action_on_server(",server_id
,") Exception while pause:", e
.get_error_message()
1173 elif 'reboot' in req
['action']:
1179 print self
.name
, ": action_on_server(",server_id
,") reboot:"
1180 #new_status = 'ACTIVE'
1181 except Exception as e
:
1182 print self
.name
, ": action_on_server(",server_id
,") Exception while reboot:", e
.get_error_message()
1183 elif 'createImage' in req
['action']:
1184 self
.create_image(dom
, req
)
1188 except libvirt
.libvirtError
as e
:
1189 if conn
is not None: conn
.close
1190 text
= e
.get_error_message()
1191 new_status
= "ERROR"
1193 print self
.name
, ": action_on_server(",server_id
,") Exception '", text
1194 if 'LookupByUUIDString' in text
or 'Domain not found' in text
or 'No existe un dominio coincidente' in text
:
1195 print self
.name
, ": action_on_server(",server_id
,") Exception removed from host"
1196 #end of if self.test
1197 if new_status
== None:
1200 print self
.name
, ": action_on_server(",server_id
,") new status", new_status
, last_error
1201 UPDATE
= {'progress':100, 'status':new_status
}
1203 if new_status
=='ERROR':
1204 if not last_retry
: #if there will be another retry do not update database
1206 elif 'terminate' in req
['action']:
1207 #PUT a log in the database
1208 print self
.name
, ": PANIC deleting server", server_id
, last_error
1209 self
.db_lock
.acquire()
1210 self
.db
.new_row('logs',
1211 {'uuid':server_id
, 'tenant_id':req
['tenant_id'], 'related':'instances','level':'panic',
1212 'description':'PANIC deleting server from host '+self
.name
+': '+last_error
}
1214 self
.db_lock
.release()
1215 if server_id
in self
.server_status
:
1216 del self
.server_status
[server_id
]
1219 UPDATE
['last_error'] = last_error
1220 if new_status
!= 'deleted' and (new_status
!= old_status
or new_status
== 'ERROR') :
1221 self
.db_lock
.acquire()
1222 self
.db
.update_rows('instances', UPDATE
, {'uuid':server_id
}, log
=True)
1223 self
.server_status
[server_id
] = new_status
1224 self
.db_lock
.release()
1225 if new_status
== 'ERROR':
1230 def restore_iface(self
, name
, mac
, lib_conn
=None):
1231 ''' make an ifdown, ifup to restore default parameter of na interface
1233 mac: mac address of the interface
1234 lib_conn: connection to the libvirt, if None a new connection is created
1235 Return 0,None if ok, -1,text if fails
1241 print self
.name
, ": restore_iface '%s' %s" % (name
, mac
)
1245 conn
= libvirt
.open("qemu+ssh://"+self
.user
+"@"+self
.host
+"/system")
1249 #wait to the pending VM deletion
1250 #TODO.Revise self.server_forceoff(True)
1252 iface
= conn
.interfaceLookupByMACString(mac
)
1255 print self
.name
, ": restore_iface '%s' %s" % (name
, mac
)
1256 except libvirt
.libvirtError
as e
:
1257 error_text
= e
.get_error_message()
1258 print self
.name
, ": restore_iface '%s' '%s' libvirt exception: %s" %(name
, mac
, error_text
)
1261 if lib_conn
is None and conn
is not None:
1263 return ret
, error_text
1266 def create_image(self
,dom
, req
):
1268 if 'path' in req
['action']['createImage']:
1269 file_dst
= req
['action']['createImage']['path']
1271 createImage
=req
['action']['createImage']
1272 img_name
= createImage
['source']['path']
1273 index
=img_name
.rfind('/')
1274 file_dst
= self
.get_notused_path(img_name
[:index
+1] + createImage
['name'] + '.qcow2')
1275 image_status
='ACTIVE'
1279 server_id
= req
['uuid']
1280 createImage
=req
['action']['createImage']
1281 file_orig
= self
.localinfo
['server_files'][server_id
] [ createImage
['source']['image_id'] ] ['source file']
1282 if 'path' in req
['action']['createImage']:
1283 file_dst
= req
['action']['createImage']['path']
1285 img_name
= createImage
['source']['path']
1286 index
=img_name
.rfind('/')
1287 file_dst
= self
.get_notused_filename(img_name
[:index
+1] + createImage
['name'] + '.qcow2')
1289 self
.copy_file(file_orig
, file_dst
)
1290 qemu_info
= self
.qemu_get_info(file_orig
)
1291 if 'backing file' in qemu_info
:
1292 for k
,v
in self
.localinfo
['files'].items():
1293 if v
==qemu_info
['backing file']:
1294 self
.qemu_change_backing(file_dst
, k
)
1296 image_status
='ACTIVE'
1298 except paramiko
.ssh_exception
.SSHException
as e
:
1299 image_status
='ERROR'
1300 error_text
= e
.args
[0]
1301 print self
.name
, "': create_image(",server_id
,") ssh Exception:", error_text
1302 if "SSH session not active" in error_text
and retry
==0:
1304 except Exception as e
:
1305 image_status
='ERROR'
1307 print self
.name
, "': create_image(",server_id
,") Exception:", error_text
1309 #TODO insert a last_error at database
1310 self
.db_lock
.acquire()
1311 self
.db
.update_rows('images', {'status':image_status
, 'progress': 100, 'path':file_dst
},
1312 {'uuid':req
['new_image']['uuid']}, log
=True)
1313 self
.db_lock
.release()
1315 def edit_iface(self
, port_id
, old_net
, new_net
):
1316 #This action imply remove and insert interface to put proper parameters
1321 self
.db_lock
.acquire()
1322 r
,c
= self
.db
.get_table(FROM
='ports as p join resources_port as rp on p.uuid=rp.port_id',
1323 WHERE
={'port_id': port_id
})
1324 self
.db_lock
.release()
1326 print self
.name
, ": edit_iface(",port_id
,") DDBB error:", c
1329 print self
.name
, ": edit_iface(",port_id
,") por not found"
1332 if port
["model"]!="VF":
1333 print self
.name
, ": edit_iface(",port_id
,") ERROR model must be VF"
1335 #create xml detach file
1338 xml
.append("<interface type='hostdev' managed='yes'>")
1339 xml
.append(" <mac address='" +port
['mac']+ "'/>")
1340 xml
.append(" <source>"+ self
.pci2xml(port
['pci'])+"\n </source>")
1341 xml
.append('</interface>')
1346 conn
= libvirt
.open("qemu+ssh://"+self
.user
+"@"+self
.host
+"/system")
1347 dom
= conn
.lookupByUUIDString(port
["instance_id"])
1350 print self
.name
, ": edit_iface detaching SRIOV interface", text
1351 dom
.detachDeviceFlags(text
, flags
=libvirt
.VIR_DOMAIN_AFFECT_LIVE
)
1353 xml
[-1] =" <vlan> <tag id='" + str(port
['vlan']) + "'/> </vlan>"
1355 xml
.append(self
.pci2xml(port
.get('vpci',None)) )
1356 xml
.append('</interface>')
1358 print self
.name
, ": edit_iface attaching SRIOV interface", text
1359 dom
.attachDeviceFlags(text
, flags
=libvirt
.VIR_DOMAIN_AFFECT_LIVE
)
1361 except libvirt
.libvirtError
as e
:
1362 text
= e
.get_error_message()
1363 print self
.name
, ": edit_iface(",port
["instance_id"],") libvirt exception:", text
1366 if conn
is not None: conn
.close
1369 def create_server(server
, db
, db_lock
, only_of_ports
):
1376 # host_id = server.get('host_id', None)
1377 extended
= server
.get('extended', None)
1379 # print '----------------------'
1380 # print json.dumps(extended, indent=4)
1383 requirements
['numa']={'memory':0, 'proc_req_type': 'threads', 'proc_req_nb':0, 'port_list':[], 'sriov_list':[]}
1384 requirements
['ram'] = server
['flavor'].get('ram', 0)
1385 if requirements
['ram']== None:
1386 requirements
['ram'] = 0
1387 requirements
['vcpus'] = server
['flavor'].get('vcpus', 0)
1388 if requirements
['vcpus']== None:
1389 requirements
['vcpus'] = 0
1390 #If extended is not defined get requirements from flavor
1391 if extended
is None:
1392 #If extended is defined in flavor convert to dictionary and use it
1393 if 'extended' in server
['flavor'] and server
['flavor']['extended'] != None:
1394 json_acceptable_string
= server
['flavor']['extended'].replace("'", "\"")
1395 extended
= json
.loads(json_acceptable_string
)
1398 #print json.dumps(extended, indent=4)
1400 #For simplicity only one numa VM are supported in the initial implementation
1401 if extended
!= None:
1402 numas
= extended
.get('numas', [])
1404 return (-2, "Multi-NUMA VMs are not supported yet")
1406 # return (-1, "At least one numa must be specified")
1408 #a for loop is used in order to be ready to multi-NUMA VMs
1412 numa_req
['memory'] = numa
.get('memory', 0)
1414 numa_req
['proc_req_nb'] = numa
['cores'] #number of cores or threads to be reserved
1415 numa_req
['proc_req_type'] = 'cores' #indicates whether cores or threads must be reserved
1416 numa_req
['proc_req_list'] = numa
.get('cores-id', None) #list of ids to be assigned to the cores or threads
1417 elif 'paired-threads' in numa
:
1418 numa_req
['proc_req_nb'] = numa
['paired-threads']
1419 numa_req
['proc_req_type'] = 'paired-threads'
1420 numa_req
['proc_req_list'] = numa
.get('paired-threads-id', None)
1421 elif 'threads' in numa
:
1422 numa_req
['proc_req_nb'] = numa
['threads']
1423 numa_req
['proc_req_type'] = 'threads'
1424 numa_req
['proc_req_list'] = numa
.get('threads-id', None)
1426 numa_req
['proc_req_nb'] = 0 # by default
1427 numa_req
['proc_req_type'] = 'threads'
1431 #Generate a list of sriov and another for physical interfaces
1432 interfaces
= numa
.get('interfaces', [])
1435 for iface
in interfaces
:
1436 iface
['bandwidth'] = int(iface
['bandwidth'])
1437 if iface
['dedicated'][:3]=='yes':
1438 port_list
.append(iface
)
1440 sriov_list
.append(iface
)
1442 #Save lists ordered from more restrictive to less bw requirements
1443 numa_req
['sriov_list'] = sorted(sriov_list
, key
=lambda k
: k
['bandwidth'], reverse
=True)
1444 numa_req
['port_list'] = sorted(port_list
, key
=lambda k
: k
['bandwidth'], reverse
=True)
1447 request
.append(numa_req
)
1449 # print "----------\n"+json.dumps(request[0], indent=4)
1450 # print '----------\n\n'
1452 #Search in db for an appropriate numa for each requested numa
1453 #at the moment multi-NUMA VMs are not supported
1455 requirements
['numa'].update(request
[0])
1456 if requirements
['numa']['memory']>0:
1457 requirements
['ram']=0 #By the moment I make incompatible ask for both Huge and non huge pages memory
1458 elif requirements
['ram']==0:
1459 return (-1, "Memory information not set neither at extended field not at ram")
1460 if requirements
['numa']['proc_req_nb']>0:
1461 requirements
['vcpus']=0 #By the moment I make incompatible ask for both Isolated and non isolated cpus
1462 elif requirements
['vcpus']==0:
1463 return (-1, "Processor information not set neither at extended field not at vcpus")
1467 result
, content
= db
.get_numas(requirements
, server
.get('host_id', None), only_of_ports
)
1471 return (-1, content
)
1473 numa_id
= content
['numa_id']
1474 host_id
= content
['host_id']
1476 #obtain threads_id and calculate pinning
1479 if requirements
['numa']['proc_req_nb']>0:
1481 result
, content
= db
.get_table(FROM
='resources_core',
1482 SELECT
=('id','core_id','thread_id'),
1483 WHERE
={'numa_id':numa_id
,'instance_id': None, 'status':'ok'} )
1489 #convert rows to a dictionary indexed by core_id
1492 if not row
['core_id'] in cores_dict
:
1493 cores_dict
[row
['core_id']] = []
1494 cores_dict
[row
['core_id']].append([row
['thread_id'],row
['id']])
1496 #In case full cores are requested
1498 if requirements
['numa']['proc_req_type'] == 'cores':
1499 #Get/create the list of the vcpu_ids
1500 vcpu_id_list
= requirements
['numa']['proc_req_list']
1501 if vcpu_id_list
== None:
1502 vcpu_id_list
= range(0,int(requirements
['numa']['proc_req_nb']))
1504 for threads
in cores_dict
.itervalues():
1506 if len(threads
) != 2:
1509 #set pinning for the first thread
1510 cpu_pinning
.append( [ vcpu_id_list
.pop(0), threads
[0][0], threads
[0][1] ] )
1512 #reserve so it is not used the second thread
1513 reserved_threads
.append(threads
[1][1])
1515 if len(vcpu_id_list
) == 0:
1518 #In case paired threads are requested
1519 elif requirements
['numa']['proc_req_type'] == 'paired-threads':
1521 #Get/create the list of the vcpu_ids
1522 if requirements
['numa']['proc_req_list'] != None:
1524 for pair
in requirements
['numa']['proc_req_list']:
1526 return -1, "Field paired-threads-id not properly specified"
1528 vcpu_id_list
.append(pair
[0])
1529 vcpu_id_list
.append(pair
[1])
1531 vcpu_id_list
= range(0,2*int(requirements
['numa']['proc_req_nb']))
1533 for threads
in cores_dict
.itervalues():
1535 if len(threads
) != 2:
1537 #set pinning for the first thread
1538 cpu_pinning
.append([vcpu_id_list
.pop(0), threads
[0][0], threads
[0][1]])
1540 #set pinning for the second thread
1541 cpu_pinning
.append([vcpu_id_list
.pop(0), threads
[1][0], threads
[1][1]])
1543 if len(vcpu_id_list
) == 0:
1546 #In case normal threads are requested
1547 elif requirements
['numa']['proc_req_type'] == 'threads':
1548 #Get/create the list of the vcpu_ids
1549 vcpu_id_list
= requirements
['numa']['proc_req_list']
1550 if vcpu_id_list
== None:
1551 vcpu_id_list
= range(0,int(requirements
['numa']['proc_req_nb']))
1553 for threads_index
in sorted(cores_dict
, key
=lambda k
: len(cores_dict
[k
])):
1554 threads
= cores_dict
[threads_index
]
1555 #set pinning for the first thread
1556 cpu_pinning
.append([vcpu_id_list
.pop(0), threads
[0][0], threads
[0][1]])
1558 #if exists, set pinning for the second thread
1559 if len(threads
) == 2 and len(vcpu_id_list
) != 0:
1560 cpu_pinning
.append([vcpu_id_list
.pop(0), threads
[1][0], threads
[1][1]])
1562 if len(vcpu_id_list
) == 0:
1565 #Get the source pci addresses for the selected numa
1566 used_sriov_ports
= []
1567 for port
in requirements
['numa']['sriov_list']:
1569 result
, content
= db
.get_table(FROM
='resources_port', SELECT
=('id', 'pci', 'mac'),WHERE
={'numa_id':numa_id
,'root_id': port
['port_id'], 'port_id': None, 'Mbps_used': 0} )
1575 if row
['id'] in used_sriov_ports
or row
['id']==port
['port_id']:
1577 port
['pci'] = row
['pci']
1578 if 'mac_address' not in port
:
1579 port
['mac_address'] = row
['mac']
1581 port
['port_id']=row
['id']
1582 port
['Mbps_used'] = port
['bandwidth']
1583 used_sriov_ports
.append(row
['id'])
1586 for port
in requirements
['numa']['port_list']:
1587 port
['Mbps_used'] = None
1588 if port
['dedicated'] != "yes:sriov":
1589 port
['mac_address'] = port
['mac']
1593 result
, content
= db
.get_table(FROM
='resources_port', SELECT
=('id', 'pci', 'mac', 'Mbps'),WHERE
={'numa_id':numa_id
,'root_id': port
['port_id'], 'port_id': None, 'Mbps_used': 0} )
1598 port
['Mbps_used'] = content
[0]['Mbps']
1600 if row
['id'] in used_sriov_ports
or row
['id']==port
['port_id']:
1602 port
['pci'] = row
['pci']
1603 if 'mac_address' not in port
:
1604 port
['mac_address'] = row
['mac'] # mac cannot be set to passthrough ports
1606 port
['port_id']=row
['id']
1607 used_sriov_ports
.append(row
['id'])
1610 # print '2. Physical ports assignation:'+json.dumps(requirements['port_list'], indent=4)
1611 # print '2. SR-IOV assignation:'+json.dumps(requirements['sriov_list'], indent=4)
1613 server
['host_id'] = host_id
1616 #Generate dictionary for saving in db the instance resources
1618 resources
['bridged-ifaces'] = []
1621 numa_dict
['interfaces'] = []
1623 numa_dict
['interfaces'] += requirements
['numa']['port_list']
1624 numa_dict
['interfaces'] += requirements
['numa']['sriov_list']
1626 #Check bridge information
1627 unified_dataplane_iface
=[]
1628 unified_dataplane_iface
+= requirements
['numa']['port_list']
1629 unified_dataplane_iface
+= requirements
['numa']['sriov_list']
1631 for control_iface
in server
.get('networks', []):
1632 control_iface
['net_id']=control_iface
.pop('uuid')
1633 #Get the brifge name
1635 result
, content
= db
.get_table(FROM
='nets', SELECT
=('name','type', 'vlan'),WHERE
={'uuid':control_iface
['net_id']} )
1640 return -1, "Error at field netwoks: Not found any network wit uuid %s" % control_iface
['net_id']
1643 if control_iface
.get("type", 'virtual') == 'virtual':
1644 if network
['type']!='bridge_data' and network
['type']!='bridge_man':
1645 return -1, "Error at field netwoks: network uuid %s for control interface is not of type bridge_man or bridge_data" % control_iface
['net_id']
1646 resources
['bridged-ifaces'].append(control_iface
)
1648 if network
['type']!='data' and network
['type']!='ptp':
1649 return -1, "Error at field netwoks: network uuid %s for dataplane interface is not of type data or ptp" % control_iface
['net_id']
1650 #dataplane interface, look for it in the numa tree and asign this network
1652 for dataplane_iface
in numa_dict
['interfaces']:
1653 if dataplane_iface
['name'] == control_iface
.get("name"):
1654 if (dataplane_iface
['dedicated'] == "yes" and control_iface
["type"] != "PF") or \
1655 (dataplane_iface
['dedicated'] == "no" and control_iface
["type"] != "VF") or \
1656 (dataplane_iface
['dedicated'] == "yes:sriov" and control_iface
["type"] != "VFnotShared") :
1657 return -1, "Error at field netwoks: mismatch at interface '%s' from flavor 'dedicated=%s' and networks 'type=%s'" % \
1658 (control_iface
.get("name"), dataplane_iface
['dedicated'], control_iface
["type"])
1659 dataplane_iface
['uuid'] = control_iface
['net_id']
1660 if dataplane_iface
['dedicated'] == "no":
1661 dataplane_iface
['vlan'] = network
['vlan']
1662 if dataplane_iface
['dedicated'] != "yes" and control_iface
.get("mac_address"):
1663 dataplane_iface
['mac_address'] = control_iface
.get("mac_address")
1664 if control_iface
.get("vpci"):
1665 dataplane_iface
['vpci'] = control_iface
.get("vpci")
1669 return -1, "Error at field netwoks: interface name %s from network not found at flavor" % control_iface
.get("name")
1671 resources
['host_id'] = host_id
1672 resources
['image_id'] = server
['image_id']
1673 resources
['flavor_id'] = server
['flavor_id']
1674 resources
['tenant_id'] = server
['tenant_id']
1675 resources
['ram'] = requirements
['ram']
1676 resources
['vcpus'] = requirements
['vcpus']
1677 resources
['status'] = 'CREATING'
1679 if 'description' in server
: resources
['description'] = server
['description']
1680 if 'name' in server
: resources
['name'] = server
['name']
1682 resources
['extended'] = {} #optional
1683 resources
['extended']['numas'] = []
1684 numa_dict
['numa_id'] = numa_id
1685 numa_dict
['memory'] = requirements
['numa']['memory']
1686 numa_dict
['cores'] = []
1688 for core
in cpu_pinning
:
1689 numa_dict
['cores'].append({'id': core
[2], 'vthread': core
[0], 'paired': paired
})
1690 for core
in reserved_threads
:
1691 numa_dict
['cores'].append({'id': core
})
1692 resources
['extended']['numas'].append(numa_dict
)
1693 if extended
!=None and 'devices' in extended
: #TODO allow extra devices without numa
1694 resources
['extended']['devices'] = extended
['devices']
1697 print '===================================={'
1698 print json
.dumps(resources
, indent
=4)
1699 print '====================================}'