1 # -*- coding: utf-8 -*-
4 # Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
5 # This file is part of openvim
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
12 # http://www.apache.org/licenses/LICENSE-2.0
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact with: nfvlabs@tid.es
25 This is thread that interact with the host and the libvirt to manage VM
26 One thread will be launched per host
28 __author__
="Pablo Montes, Alfonso Tierno"
29 __date__
="$10-jul-2014 12:07:15$"
38 from jsonschema
import validate
as js_v
, exceptions
as js_e
40 from vim_schema
import localinfo_schema
, hostinfo_schema
42 #from logging import Logger
43 #import auxiliary_functions as af
45 #TODO: insert a logging system
47 class host_thread(threading
.Thread
):
48 def __init__(self
, name
, host
, user
, db
, db_lock
, test
, image_path
, host_id
, version
, develop_mode
, develop_bridge_iface
):
53 'host','user': host ip or name to manage and user
54 'db', 'db_lock': database class and lock to use it in exclusion
56 threading
.Thread
.__init
__(self
)
61 self
.db_lock
= db_lock
63 self
.develop_mode
= develop_mode
64 self
.develop_bridge_iface
= develop_bridge_iface
65 self
.image_path
= image_path
66 self
.host_id
= host_id
67 self
.version
= version
72 self
.server_status
= {} #dictionary with pairs server_uuid:server_status
73 self
.pending_terminate_server
=[] #list with pairs (time,server_uuid) time to send a terminate for a server being destroyed
74 self
.next_update_server_status
= 0 #time when must be check servers status
78 self
.queueLock
= threading
.Lock()
79 self
.taskQueue
= Queue
.Queue(2000)
81 def ssh_connect(self
):
84 self
.ssh_conn
= paramiko
.SSHClient()
85 self
.ssh_conn
.set_missing_host_key_policy(paramiko
.AutoAddPolicy())
86 self
.ssh_conn
.load_system_host_keys()
87 self
.ssh_conn
.connect(self
.host
, username
=self
.user
, timeout
=10) #, None)
88 except paramiko
.ssh_exception
.SSHException
as e
:
90 print self
.name
, ": ssh_connect ssh Exception:", text
92 def load_localinfo(self
):
98 command
= 'mkdir -p ' + self
.image_path
99 #print self.name, ': command:', command
100 (_
, stdout
, stderr
) = self
.ssh_conn
.exec_command(command
)
101 content
= stderr
.read()
103 print self
.name
, ': command:', command
, "stderr:", content
105 command
= 'cat ' + self
.image_path
+ '/.openvim.yaml'
106 #print self.name, ': command:', command
107 (_
, stdout
, stderr
) = self
.ssh_conn
.exec_command(command
)
108 content
= stdout
.read()
109 if len(content
) == 0:
110 print self
.name
, ': command:', command
, "stderr:", stderr
.read()
111 raise paramiko
.ssh_exception
.SSHException("Error empty file ")
112 self
.localinfo
= yaml
.load(content
)
113 js_v(self
.localinfo
, localinfo_schema
)
114 self
.localinfo_dirty
=False
115 if 'server_files' not in self
.localinfo
:
116 self
.localinfo
['server_files'] = {}
117 print self
.name
, ': localinfo load from host'
120 except paramiko
.ssh_exception
.SSHException
as e
:
122 print self
.name
, ": load_localinfo ssh Exception:", text
123 except libvirt
.libvirtError
as e
:
124 text
= e
.get_error_message()
125 print self
.name
, ": load_localinfo libvirt Exception:", text
126 except yaml
.YAMLError
as exc
:
128 if hasattr(exc
, 'problem_mark'):
129 mark
= exc
.problem_mark
130 text
= " at position: (%s:%s)" % (mark
.line
+1, mark
.column
+1)
131 print self
.name
, ": load_localinfo yaml format Exception", text
132 except js_e
.ValidationError
as e
:
134 if len(e
.path
)>0: text
=" at '" + ":".join(map(str, e
.path
))+"'"
135 print self
.name
, ": load_localinfo format Exception:", text
, e
.message
136 except Exception as e
:
138 print self
.name
, ": load_localinfo Exception:", text
140 #not loaded, insert a default data and force saving by activating dirty flag
141 self
.localinfo
= {'files':{}, 'server_files':{} }
142 #self.localinfo_dirty=True
143 self
.localinfo_dirty
=False
145 def load_hostinfo(self
):
153 command
= 'cat ' + self
.image_path
+ '/hostinfo.yaml'
154 #print self.name, ': command:', command
155 (_
, stdout
, stderr
) = self
.ssh_conn
.exec_command(command
)
156 content
= stdout
.read()
157 if len(content
) == 0:
158 print self
.name
, ': command:', command
, "stderr:", stderr
.read()
159 raise paramiko
.ssh_exception
.SSHException("Error empty file ")
160 self
.hostinfo
= yaml
.load(content
)
161 js_v(self
.hostinfo
, hostinfo_schema
)
162 print self
.name
, ': hostlinfo load from host', self
.hostinfo
165 except paramiko
.ssh_exception
.SSHException
as e
:
167 print self
.name
, ": load_hostinfo ssh Exception:", text
168 except libvirt
.libvirtError
as e
:
169 text
= e
.get_error_message()
170 print self
.name
, ": load_hostinfo libvirt Exception:", text
171 except yaml
.YAMLError
as exc
:
173 if hasattr(exc
, 'problem_mark'):
174 mark
= exc
.problem_mark
175 text
= " at position: (%s:%s)" % (mark
.line
+1, mark
.column
+1)
176 print self
.name
, ": load_hostinfo yaml format Exception", text
177 except js_e
.ValidationError
as e
:
179 if len(e
.path
)>0: text
=" at '" + ":".join(map(str, e
.path
))+"'"
180 print self
.name
, ": load_hostinfo format Exception:", text
, e
.message
181 except Exception as e
:
183 print self
.name
, ": load_hostinfo Exception:", text
185 #not loaded, insert a default data
188 def save_localinfo(self
, tries
=3):
190 self
.localinfo_dirty
= False
197 command
= 'cat > ' + self
.image_path
+ '/.openvim.yaml'
198 print self
.name
, ': command:', command
199 (stdin
, _
, _
) = self
.ssh_conn
.exec_command(command
)
200 yaml
.safe_dump(self
.localinfo
, stdin
, explicit_start
=True, indent
=4, default_flow_style
=False, tags
=False, encoding
='utf-8', allow_unicode
=True)
201 self
.localinfo_dirty
= False
204 except paramiko
.ssh_exception
.SSHException
as e
:
206 print self
.name
, ": save_localinfo ssh Exception:", text
207 if "SSH session not active" in text
:
209 except libvirt
.libvirtError
as e
:
210 text
= e
.get_error_message()
211 print self
.name
, ": save_localinfo libvirt Exception:", text
212 except yaml
.YAMLError
as exc
:
214 if hasattr(exc
, 'problem_mark'):
215 mark
= exc
.problem_mark
216 text
= " at position: (%s:%s)" % (mark
.line
+1, mark
.column
+1)
217 print self
.name
, ": save_localinfo yaml format Exception", text
218 except Exception as e
:
220 print self
.name
, ": save_localinfo Exception:", text
222 def load_servers_from_db(self
):
223 self
.db_lock
.acquire()
224 r
,c
= self
.db
.get_table(SELECT
=('uuid','status', 'image_id'), FROM
='instances', WHERE
={'host_id': self
.host_id
})
225 self
.db_lock
.release()
227 self
.server_status
= {}
229 print self
.name
, ": Error getting data from database:", c
232 self
.server_status
[ server
['uuid'] ] = server
['status']
234 #convert from old version to new one
235 if 'inc_files' in self
.localinfo
and server
['uuid'] in self
.localinfo
['inc_files']:
236 server_files_dict
= {'source file': self
.localinfo
['inc_files'][ server
['uuid'] ] [0], 'file format':'raw' }
237 if server_files_dict
['source file'][-5:] == 'qcow2':
238 server_files_dict
['file format'] = 'qcow2'
240 self
.localinfo
['server_files'][ server
['uuid'] ] = { server
['image_id'] : server_files_dict
}
241 if 'inc_files' in self
.localinfo
:
242 del self
.localinfo
['inc_files']
243 self
.localinfo_dirty
= True
245 def delete_unused_files(self
):
246 '''Compares self.localinfo['server_files'] content with real servers running self.server_status obtained from database
247 Deletes unused entries at self.loacalinfo and the corresponding local files.
248 The only reason for this mismatch is the manual deletion of instances (VM) at database
252 for uuid
,images
in self
.localinfo
['server_files'].items():
253 if uuid
not in self
.server_status
:
254 for localfile
in images
.values():
256 print self
.name
, ": deleting file '%s' of unused server '%s'" %(localfile
['source file'], uuid
)
257 self
.delete_file(localfile
['source file'])
258 except paramiko
.ssh_exception
.SSHException
as e
:
259 print self
.name
, ": Exception deleting file '%s': %s" %(localfile
['source file'], str(e
))
260 del self
.localinfo
['server_files'][uuid
]
261 self
.localinfo_dirty
= True
263 def insert_task(self
, task
, *aditional
):
265 self
.queueLock
.acquire()
266 task
= self
.taskQueue
.put( (task
,) + aditional
, timeout
=5)
267 self
.queueLock
.release()
270 return -1, "timeout inserting a task over host " + self
.name
274 self
.load_localinfo()
276 self
.load_servers_from_db()
277 self
.delete_unused_files()
279 self
.queueLock
.acquire()
280 if not self
.taskQueue
.empty():
281 task
= self
.taskQueue
.get()
284 self
.queueLock
.release()
288 if self
.localinfo_dirty
:
289 self
.save_localinfo()
290 elif self
.next_update_server_status
< now
:
291 self
.update_servers_status()
292 self
.next_update_server_status
= now
+ 5
293 elif len(self
.pending_terminate_server
)>0 and self
.pending_terminate_server
[0][0]<now
:
294 self
.server_forceoff()
299 if task
[0] == 'instance':
300 print self
.name
, ": processing task instance", task
[1]['action']
304 r
=self
.action_on_server(task
[1], retry
==2)
307 elif task
[0] == 'image':
309 elif task
[0] == 'exit':
310 print self
.name
, ": processing task exit"
313 elif task
[0] == 'reload':
314 print self
.name
, ": processing task reload terminating and relaunching"
317 elif task
[0] == 'edit-iface':
318 print self
.name
, ": processing task edit-iface port=%s, old_net=%s, new_net=%s" % (task
[1], task
[2], task
[3])
319 self
.edit_iface(task
[1], task
[2], task
[3])
320 elif task
[0] == 'restore-iface':
321 print self
.name
, ": processing task restore-iface %s mac=%s" % (task
[1], task
[2])
322 self
.restore_iface(task
[1], task
[2])
324 print self
.name
, ": unknown task", task
326 def server_forceoff(self
, wait_until_finished
=False):
327 while len(self
.pending_terminate_server
)>0:
329 if self
.pending_terminate_server
[0][0]>now
:
330 if wait_until_finished
:
335 req
={'uuid':self
.pending_terminate_server
[0][1],
336 'action':{'terminate':'force'},
339 self
.action_on_server(req
)
340 self
.pending_terminate_server
.pop(0)
344 self
.server_forceoff(True)
345 if self
.localinfo_dirty
:
346 self
.save_localinfo()
348 self
.ssh_conn
.close()
349 except Exception as e
:
351 print self
.name
, ": terminate Exception:", text
352 print self
.name
, ": exit from host_thread"
354 def get_local_iface_name(self
, generic_name
):
355 if self
.hostinfo
!= None and "iface_names" in self
.hostinfo
and generic_name
in self
.hostinfo
["iface_names"]:
356 return self
.hostinfo
["iface_names"][generic_name
]
359 def create_xml_server(self
, server
, dev_list
, server_metadata
={}):
360 """Function that implements the generation of the VM XML definition.
361 Additional devices are in dev_list list
362 The main disk is upon dev_list[0]"""
364 #get if operating system is Windows
366 os_type
= server_metadata
.get('os_type', None)
367 if os_type
== None and 'metadata' in dev_list
[0]:
368 os_type
= dev_list
[0]['metadata'].get('os_type', None)
369 if os_type
!= None and os_type
.lower() == "windows":
371 #get type of hard disk bus
372 bus_ide
= True if windows_os
else False
373 bus
= server_metadata
.get('bus', None)
374 if bus
== None and 'metadata' in dev_list
[0]:
375 bus
= dev_list
[0]['metadata'].get('bus', None)
377 bus_ide
= True if bus
=='ide' else False
381 text
= "<domain type='kvm'>"
383 topo
= server_metadata
.get('topology', None)
384 if topo
== None and 'metadata' in dev_list
[0]:
385 topo
= dev_list
[0]['metadata'].get('topology', None)
387 name
= server
.get('name','') + "_" + server
['uuid']
388 name
= name
[:58] #qemu impose a length limit of 59 chars or not start. Using 58
389 text
+= self
.inc_tab() + "<name>" + name
+ "</name>"
391 text
+= self
.tab() + "<uuid>" + server
['uuid'] + "</uuid>"
394 if 'extended' in server
and server
['extended']!=None and 'numas' in server
['extended']:
395 numa
= server
['extended']['numas'][0]
398 memory
= int(numa
.get('memory',0))*1024*1024 #in KiB
400 memory
= int(server
['ram'])*1024;
402 if not self
.develop_mode
:
405 return -1, 'No memory assigned to instance'
407 text
+= self
.tab() + "<memory unit='KiB'>" +memory
+"</memory>"
408 text
+= self
.tab() + "<currentMemory unit='KiB'>" +memory
+ "</currentMemory>"
410 text
+= self
.tab()+'<memoryBacking>'+ \
411 self
.inc_tab() + '<hugepages/>'+ \
412 self
.dec_tab()+ '</memoryBacking>'
415 use_cpu_pinning
=False
416 vcpus
= int(server
.get("vcpus",0))
418 if 'cores-source' in numa
:
420 for index
in range(0, len(numa
['cores-source'])):
421 cpu_pinning
.append( [ numa
['cores-id'][index
], numa
['cores-source'][index
] ] )
423 if 'threads-source' in numa
:
425 for index
in range(0, len(numa
['threads-source'])):
426 cpu_pinning
.append( [ numa
['threads-id'][index
], numa
['threads-source'][index
] ] )
428 if 'paired-threads-source' in numa
:
430 for index
in range(0, len(numa
['paired-threads-source'])):
431 cpu_pinning
.append( [numa
['paired-threads-id'][index
][0], numa
['paired-threads-source'][index
][0] ] )
432 cpu_pinning
.append( [numa
['paired-threads-id'][index
][1], numa
['paired-threads-source'][index
][1] ] )
435 if use_cpu_pinning
and not self
.develop_mode
:
436 text
+= self
.tab()+"<vcpu placement='static'>" +str(len(cpu_pinning
)) +"</vcpu>" + \
437 self
.tab()+'<cputune>'
439 for i
in range(0, len(cpu_pinning
)):
440 text
+= self
.tab() + "<vcpupin vcpu='" +str(cpu_pinning
[i
][0])+ "' cpuset='" +str(cpu_pinning
[i
][1]) +"'/>"
441 text
+= self
.dec_tab()+'</cputune>'+ \
442 self
.tab() + '<numatune>' +\
443 self
.inc_tab() + "<memory mode='strict' nodeset='" +str(numa
['source'])+ "'/>" +\
444 self
.dec_tab() + '</numatune>'
447 return -1, "Instance without number of cpus"
448 text
+= self
.tab()+"<vcpu>" + str(vcpus
) + "</vcpu>"
453 if dev
['type']=='cdrom' :
456 text
+= self
.tab()+ '<os>' + \
457 self
.inc_tab() + "<type arch='x86_64' machine='pc'>hvm</type>"
459 text
+= self
.tab() + "<boot dev='cdrom'/>"
460 text
+= self
.tab() + "<boot dev='hd'/>" + \
461 self
.dec_tab()+'</os>'
463 text
+= self
.tab()+'<features>'+\
464 self
.inc_tab()+'<acpi/>' +\
465 self
.tab()+'<apic/>' +\
466 self
.tab()+'<pae/>'+ \
467 self
.dec_tab() +'</features>'
468 if windows_os
or topo
=="oneSocket":
469 text
+= self
.tab() + "<cpu mode='host-model'> <topology sockets='1' cores='%d' threads='1' /> </cpu>"% vcpus
471 text
+= self
.tab() + "<cpu mode='host-model'></cpu>"
472 text
+= self
.tab() + "<clock offset='utc'/>" +\
473 self
.tab() + "<on_poweroff>preserve</on_poweroff>" + \
474 self
.tab() + "<on_reboot>restart</on_reboot>" + \
475 self
.tab() + "<on_crash>restart</on_crash>"
476 text
+= self
.tab() + "<devices>" + \
477 self
.inc_tab() + "<emulator>/usr/libexec/qemu-kvm</emulator>" + \
478 self
.tab() + "<serial type='pty'>" +\
479 self
.inc_tab() + "<target port='0'/>" + \
480 self
.dec_tab() + "</serial>" +\
481 self
.tab() + "<console type='pty'>" + \
482 self
.inc_tab()+ "<target type='serial' port='0'/>" + \
483 self
.dec_tab()+'</console>'
485 text
+= self
.tab() + "<controller type='usb' index='0'/>" + \
486 self
.tab() + "<controller type='ide' index='0'/>" + \
487 self
.tab() + "<input type='mouse' bus='ps2'/>" + \
488 self
.tab() + "<sound model='ich6'/>" + \
489 self
.tab() + "<video>" + \
490 self
.inc_tab() + "<model type='cirrus' vram='9216' heads='1'/>" + \
491 self
.dec_tab() + "</video>" + \
492 self
.tab() + "<memballoon model='virtio'/>" + \
493 self
.tab() + "<input type='tablet' bus='usb'/>" #TODO revisar
495 #> self.tab()+'<alias name=\'hostdev0\'/>\n' +\
496 #> self.dec_tab()+'</hostdev>\n' +\
497 #> self.tab()+'<input type=\'tablet\' bus=\'usb\'/>\n'
499 text
+= self
.tab() + "<graphics type='vnc' port='-1' autoport='yes'/>"
501 #If image contains 'GRAPH' include graphics
502 #if 'GRAPH' in image:
503 text
+= self
.tab() + "<graphics type='vnc' port='-1' autoport='yes' listen='0.0.0.0'>" +\
504 self
.inc_tab() + "<listen type='address' address='0.0.0.0'/>" +\
505 self
.dec_tab() + "</graphics>"
509 bus_ide_dev
= bus_ide
510 if dev
['type']=='cdrom' or dev
['type']=='disk':
511 if dev
['type']=='cdrom':
513 text
+= self
.tab() + "<disk type='file' device='"+dev
['type']+"'>"
514 if 'file format' in dev
:
515 text
+= self
.inc_tab() + "<driver name='qemu' type='" +dev
['file format']+ "' cache='writethrough'/>"
516 if 'source file' in dev
:
517 text
+= self
.tab() + "<source file='" +dev
['source file']+ "'/>"
518 #elif v['type'] == 'block':
519 # text += self.tab() + "<source dev='" + v['source'] + "'/>"
521 # return -1, 'Unknown disk type ' + v['type']
522 vpci
= dev
.get('vpci',None)
524 vpci
= dev
['metadata'].get('vpci',None)
525 text
+= self
.pci2xml(vpci
)
528 text
+= self
.tab() + "<target dev='hd" +vd_index
+ "' bus='ide'/>" #TODO allows several type of disks
530 text
+= self
.tab() + "<target dev='vd" +vd_index
+ "' bus='virtio'/>"
531 text
+= self
.dec_tab() + '</disk>'
532 vd_index
= chr(ord(vd_index
)+1)
533 elif dev
['type']=='xml':
534 dev_text
= dev
['xml']
536 dev_text
= dev_text
.replace('__vpci__', dev
['vpci'])
537 if 'source file' in dev
:
538 dev_text
= dev_text
.replace('__file__', dev
['source file'])
539 if 'file format' in dev
:
540 dev_text
= dev_text
.replace('__format__', dev
['source file'])
541 if '__dev__' in dev_text
:
542 dev_text
= dev_text
.replace('__dev__', vd_index
)
543 vd_index
= chr(ord(vd_index
)+1)
546 return -1, 'Unknown device type ' + dev
['type']
549 bridge_interfaces
= server
.get('networks', [])
550 for v
in bridge_interfaces
:
552 self
.db_lock
.acquire()
553 result
, content
= self
.db
.get_table(FROM
='nets', SELECT
=('provider',),WHERE
={'uuid':v
['net_id']} )
554 self
.db_lock
.release()
556 print "create_xml_server ERROR getting nets",result
, content
558 #ALF: Allow by the moment the 'default' bridge net because is confortable for provide internet to VM
559 #I know it is not secure
560 #for v in sorted(desc['network interfaces'].itervalues()):
561 model
= v
.get("model", None)
562 if content
[0]['provider']=='default':
563 text
+= self
.tab() + "<interface type='network'>" + \
564 self
.inc_tab() + "<source network='" +content
[0]['provider']+ "'/>"
565 elif content
[0]['provider'][0:7]=='macvtap':
566 text
+= self
.tab()+"<interface type='direct'>" + \
567 self
.inc_tab() + "<source dev='" + self
.get_local_iface_name(content
[0]['provider'][8:]) + "' mode='bridge'/>" + \
568 self
.tab() + "<target dev='macvtap0'/>"
570 text
+= self
.tab() + "<alias name='net" + str(net_nb
) + "'/>"
573 elif content
[0]['provider'][0:6]=='bridge':
574 text
+= self
.tab() + "<interface type='bridge'>" + \
575 self
.inc_tab()+"<source bridge='" +self
.get_local_iface_name(content
[0]['provider'][7:])+ "'/>"
577 text
+= self
.tab() + "<target dev='vnet" + str(net_nb
)+ "'/>" +\
578 self
.tab() + "<alias name='net" + str(net_nb
)+ "'/>"
582 return -1, 'Unknown Bridge net provider ' + content
[0]['provider']
584 text
+= self
.tab() + "<model type='" +model
+ "'/>"
585 if v
.get('mac_address', None) != None:
586 text
+= self
.tab() +"<mac address='" +v
['mac_address']+ "'/>"
587 text
+= self
.pci2xml(v
.get('vpci',None))
588 text
+= self
.dec_tab()+'</interface>'
592 interfaces
= numa
.get('interfaces', [])
596 if self
.develop_mode
: #map these interfaces to bridges
597 text
+= self
.tab() + "<interface type='bridge'>" + \
598 self
.inc_tab()+"<source bridge='" +self
.develop_bridge_iface
+ "'/>"
600 text
+= self
.tab() + "<target dev='vnet" + str(net_nb
)+ "'/>" +\
601 self
.tab() + "<alias name='net" + str(net_nb
)+ "'/>"
603 text
+= self
.tab() + "<model type='e1000'/>" #e1000 is more probable to be supported than 'virtio'
604 if v
.get('mac_address', None) != None:
605 text
+= self
.tab() +"<mac address='" +v
['mac_address']+ "'/>"
606 text
+= self
.pci2xml(v
.get('vpci',None))
607 text
+= self
.dec_tab()+'</interface>'
610 if v
['dedicated'] == 'yes': #passthrought
611 text
+= self
.tab() + "<hostdev mode='subsystem' type='pci' managed='yes'>" + \
612 self
.inc_tab() + "<source>"
614 text
+= self
.pci2xml(v
['source'])
615 text
+= self
.dec_tab()+'</source>'
616 text
+= self
.pci2xml(v
.get('vpci',None))
618 text
+= self
.tab() + "<alias name='hostdev" + str(net_nb
) + "'/>"
619 text
+= self
.dec_tab()+'</hostdev>'
621 else: #sriov_interfaces
622 #skip not connected interfaces
623 if v
.get("net_id") == None:
625 text
+= self
.tab() + "<interface type='hostdev' managed='yes'>"
627 if v
.get('mac_address', None) != None:
628 text
+= self
.tab() + "<mac address='" +v
['mac_address']+ "'/>"
629 text
+= self
.tab()+'<source>'
631 text
+= self
.pci2xml(v
['source'])
632 text
+= self
.dec_tab()+'</source>'
633 if v
.get('vlan',None) != None:
634 text
+= self
.tab() + "<vlan> <tag id='" + str(v
['vlan']) + "'/> </vlan>"
635 text
+= self
.pci2xml(v
.get('vpci',None))
637 text
+= self
.tab() + "<alias name='hostdev" + str(net_nb
) + "'/>"
638 text
+= self
.dec_tab()+'</interface>'
641 text
+= self
.dec_tab()+'</devices>'+\
642 self
.dec_tab()+'</domain>'
645 def pci2xml(self
, pci
):
646 '''from a pci format text XXXX:XX:XX.X generates the xml content of <address>
647 alows an empty pci text'''
650 first_part
= pci
.split(':')
651 second_part
= first_part
[2].split('.')
652 return self
.tab() + "<address type='pci' domain='0x" + first_part
[0] + \
653 "' bus='0x" + first_part
[1] + "' slot='0x" + second_part
[0] + \
654 "' function='0x" + second_part
[1] + "'/>"
657 """Return indentation according to xml_level"""
658 return "\n" + (' '*self
.xml_level
)
661 """Increment and return indentation according to xml_level"""
666 """Decrement and return indentation according to xml_level"""
670 def get_file_info(self
, path
):
671 command
= 'ls -lL --time-style=+%Y-%m-%dT%H:%M:%S ' + path
672 print self
.name
, ': command:', command
673 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
674 content
= stdout
.read()
675 if len(content
) == 0:
676 return None # file does not exist
678 return content
.split(" ") #(permission, 1, owner, group, size, date, file)
680 def qemu_get_info(self
, path
):
681 command
= 'qemu-img info ' + path
682 print self
.name
, ': command:', command
683 (_
, stdout
, stderr
) = self
.ssh_conn
.exec_command(command
)
684 content
= stdout
.read()
685 if len(content
) == 0:
686 error
= stderr
.read()
687 print self
.name
, ": get_qemu_info error ", error
688 raise paramiko
.ssh_exception
.SSHException("Error getting qemu_info: " + error
)
691 return yaml
.load(content
)
692 except yaml
.YAMLError
as exc
:
694 if hasattr(exc
, 'problem_mark'):
695 mark
= exc
.problem_mark
696 text
= " at position: (%s:%s)" % (mark
.line
+1, mark
.column
+1)
697 print self
.name
, ": get_qemu_info yaml format Exception", text
698 raise paramiko
.ssh_exception
.SSHException("Error getting qemu_info yaml format" + text
)
700 def qemu_change_backing(self
, inc_file
, new_backing_file
):
701 command
= 'qemu-img rebase -u -b ' + new_backing_file
+ ' ' + inc_file
702 print self
.name
, ': command:', command
703 (_
, _
, stderr
) = self
.ssh_conn
.exec_command(command
)
704 content
= stderr
.read()
705 if len(content
) == 0:
708 print self
.name
, ": qemu_change_backing error: ", content
711 def get_notused_filename(self
, proposed_name
, suffix
=''):
712 '''Look for a non existing file_name in the host
713 proposed_name: proposed file name, includes path
714 suffix: suffix to be added to the name, before the extention
716 extension
= proposed_name
.rfind(".")
717 slash
= proposed_name
.rfind("/")
718 if extension
< 0 or extension
< slash
: # no extension
719 extension
= len(proposed_name
)
720 target_name
= proposed_name
[:extension
] + suffix
+ proposed_name
[extension
:]
721 info
= self
.get_file_info(target_name
)
726 while info
is not None:
727 target_name
= proposed_name
[:extension
] + suffix
+ "-" + str(index
) + proposed_name
[extension
:]
729 info
= self
.get_file_info(target_name
)
732 def get_notused_path(self
, proposed_path
, suffix
=''):
733 '''Look for a non existing path at database for images
734 proposed_path: proposed file name, includes path
735 suffix: suffix to be added to the name, before the extention
737 extension
= proposed_path
.rfind(".")
739 extension
= len(proposed_path
)
741 target_path
= proposed_path
[:extension
] + suffix
+ proposed_path
[extension
:]
744 r
,_
=self
.db
.get_table(FROM
="images",WHERE
={"path":target_path
})
747 target_path
= proposed_path
[:extension
] + suffix
+ "-" + str(index
) + proposed_path
[extension
:]
751 def delete_file(self
, file_name
):
752 command
= 'rm -f '+file_name
753 print self
.name
, ': command:', command
754 (_
, _
, stderr
) = self
.ssh_conn
.exec_command(command
)
755 error_msg
= stderr
.read()
756 if len(error_msg
) > 0:
757 raise paramiko
.ssh_exception
.SSHException("Error deleting file: " + error_msg
)
759 def copy_file(self
, source
, destination
, perserve_time
=True):
760 command
= 'cp --no-preserve=mode '
761 if perserve_time
: command
+= '--preserve=timestamps '
762 command
+= source
+ ' ' + destination
763 print self
.name
, ': command:', command
764 (_
, _
, stderr
) = self
.ssh_conn
.exec_command(command
)
765 error_msg
= stderr
.read()
766 if len(error_msg
) > 0:
767 raise paramiko
.ssh_exception
.SSHException("Error copying image to local host: " + error_msg
)
769 def copy_remote_file(self
, remote_file
, use_incremental
):
770 ''' Copy a file from the repository to local folder and recursively
771 copy the backing files in case the remote file is incremental
772 Read and/or modified self.localinfo['files'] that contain the
773 unmodified copies of images in the local path
775 remote_file: path of remote file
776 use_incremental: None (leave the decision to this function), True, False
778 local_file: name of local file
779 qemu_info: dict with quemu information of local file
780 use_incremental_out: True, False; same as use_incremental, but if None a decision is taken
783 use_incremental_out
= use_incremental
784 new_backing_file
= None
787 #in case incremental use is not decided, take the decision depending on the image
788 #avoid the use of incremental if this image is already incremental
789 qemu_remote_info
= self
.qemu_get_info(remote_file
)
790 if use_incremental_out
==None:
791 use_incremental_out
= not 'backing file' in qemu_remote_info
792 #copy recursivelly the backing files
793 if 'backing file' in qemu_remote_info
:
794 new_backing_file
, _
, _
= self
.copy_remote_file(qemu_remote_info
['backing file'], True)
796 #check if remote file is present locally
797 if use_incremental_out
and remote_file
in self
.localinfo
['files']:
798 local_file
= self
.localinfo
['files'][remote_file
]
799 local_file_info
= self
.get_file_info(local_file
)
800 remote_file_info
= self
.get_file_info(remote_file
)
801 if local_file_info
== None:
803 elif local_file_info
[4]!=remote_file_info
[4] or local_file_info
[5]!=remote_file_info
[5]:
804 #local copy of file not valid because date or size are different.
805 #TODO DELETE local file if this file is not used by any active virtual machine
807 self
.delete_file(local_file
)
808 del self
.localinfo
['files'][remote_file
]
812 else: #check that the local file has the same backing file, or there are not backing at all
813 qemu_info
= self
.qemu_get_info(local_file
)
814 if new_backing_file
!= qemu_info
.get('backing file'):
818 if local_file
== None: #copy the file
819 img_name
= remote_file
.split('/') [-1]
820 img_local
= self
.image_path
+ '/' + img_name
821 local_file
= self
.get_notused_filename(img_local
)
822 self
.copy_file(remote_file
, local_file
, use_incremental_out
)
824 if use_incremental_out
:
825 self
.localinfo
['files'][remote_file
] = local_file
827 self
.qemu_change_backing(local_file
, new_backing_file
)
828 qemu_info
= self
.qemu_get_info(local_file
)
830 return local_file
, qemu_info
, use_incremental_out
832 def launch_server(self
, conn
, server
, rebuild
=False, domain
=None):
834 time
.sleep(random
.randint(20,150)) #sleep random timeto be make it a bit more real
837 server_id
= server
['uuid']
838 paused
= server
.get('paused','no')
840 if domain
!=None and rebuild
==False:
842 #self.server_status[server_id] = 'ACTIVE'
845 self
.db_lock
.acquire()
846 result
, server_data
= self
.db
.get_instance(server_id
)
847 self
.db_lock
.release()
849 print self
.name
, ": launch_server ERROR getting server from DB",result
, server_data
850 return result
, server_data
852 #0: get image metadata
853 server_metadata
= server
.get('metadata', {})
854 use_incremental
= None
856 if "use_incremental" in server_metadata
:
857 use_incremental
= False if server_metadata
["use_incremental"]=="no" else True
859 server_host_files
= self
.localinfo
['server_files'].get( server
['uuid'], {})
861 #delete previous incremental files
862 for file_
in server_host_files
.values():
863 self
.delete_file(file_
['source file'] )
866 #1: obtain aditional devices (disks)
867 #Put as first device the main disk
868 devices
= [ {"type":"disk", "image_id":server
['image_id'], "vpci":server_metadata
.get('vpci', None) } ]
869 if 'extended' in server_data
and server_data
['extended']!=None and "devices" in server_data
['extended']:
870 devices
+= server_data
['extended']['devices']
873 if dev
['image_id'] == None:
876 self
.db_lock
.acquire()
877 result
, content
= self
.db
.get_table(FROM
='images', SELECT
=('path','metadata'),WHERE
={'uuid':dev
['image_id']} )
878 self
.db_lock
.release()
880 error_text
= "ERROR", result
, content
, "when getting image", dev
['image_id']
881 print self
.name
, ": launch_server", error_text
882 return -1, error_text
883 if content
[0]['metadata'] is not None:
884 dev
['metadata'] = json
.loads(content
[0]['metadata'])
888 if dev
['image_id'] in server_host_files
:
889 dev
['source file'] = server_host_files
[ dev
['image_id'] ] ['source file'] #local path
890 dev
['file format'] = server_host_files
[ dev
['image_id'] ] ['file format'] # raw or qcow2
893 #2: copy image to host
894 remote_file
= content
[0]['path']
895 use_incremental_image
= use_incremental
896 if dev
['metadata'].get("use_incremental") == "no":
897 use_incremental_image
= False
898 local_file
, qemu_info
, use_incremental_image
= self
.copy_remote_file(remote_file
, use_incremental_image
)
900 #create incremental image
901 if use_incremental_image
:
902 local_file_inc
= self
.get_notused_filename(local_file
, '.inc')
903 command
= 'qemu-img create -f qcow2 '+local_file_inc
+ ' -o backing_file='+ local_file
904 print 'command:', command
905 (_
, _
, stderr
) = self
.ssh_conn
.exec_command(command
)
906 error_msg
= stderr
.read()
907 if len(error_msg
) > 0:
908 raise paramiko
.ssh_exception
.SSHException("Error creating incremental file: " + error_msg
)
909 local_file
= local_file_inc
910 qemu_info
= {'file format':'qcow2'}
912 server_host_files
[ dev
['image_id'] ] = {'source file': local_file
, 'file format': qemu_info
['file format']}
914 dev
['source file'] = local_file
915 dev
['file format'] = qemu_info
['file format']
917 self
.localinfo
['server_files'][ server
['uuid'] ] = server_host_files
918 self
.localinfo_dirty
= True
921 result
, xml
= self
.create_xml_server(server_data
, devices
, server_metadata
) #local_file
923 print self
.name
, ": create xml server error:", xml
925 print self
.name
, ": create xml:", xml
926 atribute
= libvirt
.VIR_DOMAIN_START_PAUSED
if paused
== "yes" else 0
928 if not rebuild
: #ensures that any pending destroying server is done
929 self
.server_forceoff(True)
930 #print self.name, ": launching instance" #, xml
931 conn
.createXML(xml
, atribute
)
932 #self.server_status[server_id] = 'PAUSED' if paused == "yes" else 'ACTIVE'
936 except paramiko
.ssh_exception
.SSHException
as e
:
938 print self
.name
, ": launch_server(%s) ssh Exception: %s" %(server_id
, text
)
939 if "SSH session not active" in text
:
941 except libvirt
.libvirtError
as e
:
942 text
= e
.get_error_message()
943 print self
.name
, ": launch_server(%s) libvirt Exception: %s" %(server_id
, text
)
944 except Exception as e
:
946 print self
.name
, ": launch_server(%s) Exception: %s" %(server_id
, text
)
949 def update_servers_status(self
):
951 # VIR_DOMAIN_NOSTATE = 0
952 # VIR_DOMAIN_RUNNING = 1
953 # VIR_DOMAIN_BLOCKED = 2
954 # VIR_DOMAIN_PAUSED = 3
955 # VIR_DOMAIN_SHUTDOWN = 4
956 # VIR_DOMAIN_SHUTOFF = 5
957 # VIR_DOMAIN_CRASHED = 6
958 # VIR_DOMAIN_PMSUSPENDED = 7 #TODO suspended
960 if self
.test
or len(self
.server_status
)==0:
964 conn
= libvirt
.open("qemu+ssh://"+self
.user
+"@"+self
.host
+"/system")
965 domains
= conn
.listAllDomains()
967 for domain
in domains
:
968 uuid
= domain
.UUIDString() ;
969 libvirt_status
= domain
.state()
970 #print libvirt_status
971 if libvirt_status
[0] == libvirt
.VIR_DOMAIN_RUNNING
or libvirt_status
[0] == libvirt
.VIR_DOMAIN_SHUTDOWN
:
972 new_status
= "ACTIVE"
973 elif libvirt_status
[0] == libvirt
.VIR_DOMAIN_PAUSED
:
974 new_status
= "PAUSED"
975 elif libvirt_status
[0] == libvirt
.VIR_DOMAIN_SHUTOFF
:
976 new_status
= "INACTIVE"
977 elif libvirt_status
[0] == libvirt
.VIR_DOMAIN_CRASHED
:
981 domain_dict
[uuid
] = new_status
983 except libvirt
.libvirtError
as e
:
984 print self
.name
, ": get_state() Exception '", e
.get_error_message()
987 for server_id
, current_status
in self
.server_status
.iteritems():
989 if server_id
in domain_dict
:
990 new_status
= domain_dict
[server_id
]
992 new_status
= "INACTIVE"
994 if new_status
== None or new_status
== current_status
:
996 if new_status
== 'INACTIVE' and current_status
== 'ERROR':
997 continue #keep ERROR status, because obviously this machine is not running
999 print self
.name
, ": server ", server_id
, "status change from ", current_status
, "to", new_status
1000 STATUS
={'progress':100, 'status':new_status
}
1001 if new_status
== 'ERROR':
1002 STATUS
['last_error'] = 'machine has crashed'
1003 self
.db_lock
.acquire()
1004 r
,_
= self
.db
.update_rows('instances', STATUS
, {'uuid':server_id
}, log
=False)
1005 self
.db_lock
.release()
1007 self
.server_status
[server_id
] = new_status
1009 def action_on_server(self
, req
, last_retry
=True):
1010 '''Perform an action on a req
1012 req: dictionary that contain:
1013 server properties: 'uuid','name','tenant_id','status'
1015 host properties: 'user', 'ip_name'
1016 return (error, text)
1017 0: No error. VM is updated to new state,
1018 -1: Invalid action, as trying to pause a PAUSED VM
1019 -2: Error accessing host
1021 -4: Error at DB access
1022 -5: Error while trying to perform action. VM is updated to ERROR
1024 server_id
= req
['uuid']
1027 old_status
= req
['status']
1031 if 'terminate' in req
['action']:
1032 new_status
= 'deleted'
1033 elif 'shutoff' in req
['action'] or 'shutdown' in req
['action'] or 'forceOff' in req
['action']:
1034 if req
['status']!='ERROR':
1036 new_status
= 'INACTIVE'
1037 elif 'start' in req
['action'] and req
['status']!='ERROR': new_status
= 'ACTIVE'
1038 elif 'resume' in req
['action'] and req
['status']!='ERROR' and req
['status']!='INACTIVE' : new_status
= 'ACTIVE'
1039 elif 'pause' in req
['action'] and req
['status']!='ERROR': new_status
= 'PAUSED'
1040 elif 'reboot' in req
['action'] and req
['status']!='ERROR': new_status
= 'ACTIVE'
1041 elif 'rebuild' in req
['action']:
1042 time
.sleep(random
.randint(20,150))
1043 new_status
= 'ACTIVE'
1044 elif 'createImage' in req
['action']:
1046 self
.create_image(None, req
)
1049 conn
= libvirt
.open("qemu+ssh://"+self
.user
+"@"+self
.host
+"/system")
1051 dom
= conn
.lookupByUUIDString(server_id
)
1052 except libvirt
.libvirtError
as e
:
1053 text
= e
.get_error_message()
1054 if 'LookupByUUIDString' in text
or 'Domain not found' in text
or 'No existe un dominio coincidente' in text
:
1057 print self
.name
, ": action_on_server(",server_id
,") libvirt exception:", text
1060 if 'forceOff' in req
['action']:
1062 print self
.name
, ": action_on_server(",server_id
,") domain not running"
1065 print self
.name
, ": sending DESTROY to server", server_id
1067 except Exception as e
:
1068 if "domain is not running" not in e
.get_error_message():
1069 print self
.name
, ": action_on_server(",server_id
,") Exception while sending force off:", e
.get_error_message()
1070 last_error
= 'action_on_server Exception while destroy: ' + e
.get_error_message()
1071 new_status
= 'ERROR'
1073 elif 'terminate' in req
['action']:
1075 print self
.name
, ": action_on_server(",server_id
,") domain not running"
1076 new_status
= 'deleted'
1079 if req
['action']['terminate'] == 'force':
1080 print self
.name
, ": sending DESTROY to server", server_id
1082 new_status
= 'deleted'
1084 print self
.name
, ": sending SHUTDOWN to server", server_id
1086 self
.pending_terminate_server
.append( (time
.time()+10,server_id
) )
1087 except Exception as e
:
1088 print self
.name
, ": action_on_server(",server_id
,") Exception while destroy:", e
.get_error_message()
1089 last_error
= 'action_on_server Exception while destroy: ' + e
.get_error_message()
1090 new_status
= 'ERROR'
1091 if "domain is not running" in e
.get_error_message():
1094 new_status
= 'deleted'
1096 print self
.name
, ": action_on_server(",server_id
,") Exception while undefine:", e
.get_error_message()
1097 last_error
= 'action_on_server Exception2 while undefine:', e
.get_error_message()
1098 #Exception: 'virDomainDetachDevice() failed'
1099 if new_status
=='deleted':
1100 if server_id
in self
.server_status
:
1101 del self
.server_status
[server_id
]
1102 if req
['uuid'] in self
.localinfo
['server_files']:
1103 for file_
in self
.localinfo
['server_files'][ req
['uuid'] ].values():
1105 self
.delete_file(file_
['source file'])
1108 del self
.localinfo
['server_files'][ req
['uuid'] ]
1109 self
.localinfo_dirty
= True
1111 elif 'shutoff' in req
['action'] or 'shutdown' in req
['action']:
1114 print self
.name
, ": action_on_server(",server_id
,") domain not running"
1117 # new_status = 'INACTIVE'
1118 #TODO: check status for changing at database
1119 except Exception as e
:
1120 new_status
= 'ERROR'
1121 print self
.name
, ": action_on_server(",server_id
,") Exception while shutdown:", e
.get_error_message()
1122 last_error
= 'action_on_server Exception while shutdown: ' + e
.get_error_message()
1124 elif 'rebuild' in req
['action']:
1127 r
= self
.launch_server(conn
, req
, True, None)
1129 new_status
= 'ERROR'
1132 new_status
= 'ACTIVE'
1133 elif 'start' in req
['action']:
1134 #La instancia está sólo en la base de datos pero no en la libvirt. es necesario crearla
1135 rebuild
= True if req
['action']['start']=='rebuild' else False
1136 r
= self
.launch_server(conn
, req
, rebuild
, dom
)
1138 new_status
= 'ERROR'
1141 new_status
= 'ACTIVE'
1143 elif 'resume' in req
['action']:
1149 # new_status = 'ACTIVE'
1150 except Exception as e
:
1151 print self
.name
, ": action_on_server(",server_id
,") Exception while resume:", e
.get_error_message()
1153 elif 'pause' in req
['action']:
1159 # new_status = 'PAUSED'
1160 except Exception as e
:
1161 print self
.name
, ": action_on_server(",server_id
,") Exception while pause:", e
.get_error_message()
1163 elif 'reboot' in req
['action']:
1169 print self
.name
, ": action_on_server(",server_id
,") reboot:"
1170 #new_status = 'ACTIVE'
1171 except Exception as e
:
1172 print self
.name
, ": action_on_server(",server_id
,") Exception while reboot:", e
.get_error_message()
1173 elif 'createImage' in req
['action']:
1174 self
.create_image(dom
, req
)
1178 except libvirt
.libvirtError
as e
:
1179 if conn
is not None: conn
.close
1180 text
= e
.get_error_message()
1181 new_status
= "ERROR"
1183 print self
.name
, ": action_on_server(",server_id
,") Exception '", text
1184 if 'LookupByUUIDString' in text
or 'Domain not found' in text
or 'No existe un dominio coincidente' in text
:
1185 print self
.name
, ": action_on_server(",server_id
,") Exception removed from host"
1186 #end of if self.test
1187 if new_status
== None:
1190 print self
.name
, ": action_on_server(",server_id
,") new status", new_status
, last_error
1191 UPDATE
= {'progress':100, 'status':new_status
}
1193 if new_status
=='ERROR':
1194 if not last_retry
: #if there will be another retry do not update database
1196 elif 'terminate' in req
['action']:
1197 #PUT a log in the database
1198 print self
.name
, ": PANIC deleting server", server_id
, last_error
1199 self
.db_lock
.acquire()
1200 self
.db
.new_row('logs',
1201 {'uuid':server_id
, 'tenant_id':req
['tenant_id'], 'related':'instances','level':'panic',
1202 'description':'PANIC deleting server from host '+self
.name
+': '+last_error
}
1204 self
.db_lock
.release()
1205 if server_id
in self
.server_status
:
1206 del self
.server_status
[server_id
]
1209 UPDATE
['last_error'] = last_error
1210 if new_status
!= 'deleted' and (new_status
!= old_status
or new_status
== 'ERROR') :
1211 self
.db_lock
.acquire()
1212 self
.db
.update_rows('instances', UPDATE
, {'uuid':server_id
}, log
=True)
1213 self
.server_status
[server_id
] = new_status
1214 self
.db_lock
.release()
1215 if new_status
== 'ERROR':
1220 def restore_iface(self
, name
, mac
, lib_conn
=None):
1221 ''' make an ifdown, ifup to restore default parameter of na interface
1223 mac: mac address of the interface
1224 lib_conn: connection to the libvirt, if None a new connection is created
1225 Return 0,None if ok, -1,text if fails
1231 print self
.name
, ": restore_iface '%s' %s" % (name
, mac
)
1235 conn
= libvirt
.open("qemu+ssh://"+self
.user
+"@"+self
.host
+"/system")
1239 #wait to the pending VM deletion
1240 #TODO.Revise self.server_forceoff(True)
1242 iface
= conn
.interfaceLookupByMACString(mac
)
1245 print self
.name
, ": restore_iface '%s' %s" % (name
, mac
)
1246 except libvirt
.libvirtError
as e
:
1247 error_text
= e
.get_error_message()
1248 print self
.name
, ": restore_iface '%s' '%s' libvirt exception: %s" %(name
, mac
, error_text
)
1251 if lib_conn
is None and conn
is not None:
1253 return ret
, error_text
1256 def create_image(self
,dom
, req
):
1258 if 'path' in req
['action']['createImage']:
1259 file_dst
= req
['action']['createImage']['path']
1261 createImage
=req
['action']['createImage']
1262 img_name
= createImage
['source']['path']
1263 index
=img_name
.rfind('/')
1264 file_dst
= self
.get_notused_path(img_name
[:index
+1] + createImage
['name'] + '.qcow2')
1265 image_status
='ACTIVE'
1269 server_id
= req
['uuid']
1270 createImage
=req
['action']['createImage']
1271 file_orig
= self
.localinfo
['server_files'][server_id
] [ createImage
['source']['image_id'] ] ['source file']
1272 if 'path' in req
['action']['createImage']:
1273 file_dst
= req
['action']['createImage']['path']
1275 img_name
= createImage
['source']['path']
1276 index
=img_name
.rfind('/')
1277 file_dst
= self
.get_notused_filename(img_name
[:index
+1] + createImage
['name'] + '.qcow2')
1279 self
.copy_file(file_orig
, file_dst
)
1280 qemu_info
= self
.qemu_get_info(file_orig
)
1281 if 'backing file' in qemu_info
:
1282 for k
,v
in self
.localinfo
['files'].items():
1283 if v
==qemu_info
['backing file']:
1284 self
.qemu_change_backing(file_dst
, k
)
1286 image_status
='ACTIVE'
1288 except paramiko
.ssh_exception
.SSHException
as e
:
1289 image_status
='ERROR'
1290 error_text
= e
.args
[0]
1291 print self
.name
, "': create_image(",server_id
,") ssh Exception:", error_text
1292 if "SSH session not active" in error_text
and retry
==0:
1294 except Exception as e
:
1295 image_status
='ERROR'
1297 print self
.name
, "': create_image(",server_id
,") Exception:", error_text
1299 #TODO insert a last_error at database
1300 self
.db_lock
.acquire()
1301 self
.db
.update_rows('images', {'status':image_status
, 'progress': 100, 'path':file_dst
},
1302 {'uuid':req
['new_image']['uuid']}, log
=True)
1303 self
.db_lock
.release()
1305 def edit_iface(self
, port_id
, old_net
, new_net
):
1306 #This action imply remove and insert interface to put proper parameters
1311 self
.db_lock
.acquire()
1312 r
,c
= self
.db
.get_table(FROM
='ports as p join resources_port as rp on p.uuid=rp.port_id',
1313 WHERE
={'port_id': port_id
})
1314 self
.db_lock
.release()
1316 print self
.name
, ": edit_iface(",port_id
,") DDBB error:", c
1319 print self
.name
, ": edit_iface(",port_id
,") por not found"
1322 if port
["model"]!="VF":
1323 print self
.name
, ": edit_iface(",port_id
,") ERROR model must be VF"
1325 #create xml detach file
1328 xml
.append("<interface type='hostdev' managed='yes'>")
1329 xml
.append(" <mac address='" +port
['mac']+ "'/>")
1330 xml
.append(" <source>"+ self
.pci2xml(port
['pci'])+"\n </source>")
1331 xml
.append('</interface>')
1336 conn
= libvirt
.open("qemu+ssh://"+self
.user
+"@"+self
.host
+"/system")
1337 dom
= conn
.lookupByUUIDString(port
["instance_id"])
1340 print self
.name
, ": edit_iface detaching SRIOV interface", text
1341 dom
.detachDeviceFlags(text
, flags
=libvirt
.VIR_DOMAIN_AFFECT_LIVE
)
1343 xml
[-1] =" <vlan> <tag id='" + str(port
['vlan']) + "'/> </vlan>"
1345 xml
.append(self
.pci2xml(port
.get('vpci',None)) )
1346 xml
.append('</interface>')
1348 print self
.name
, ": edit_iface attaching SRIOV interface", text
1349 dom
.attachDeviceFlags(text
, flags
=libvirt
.VIR_DOMAIN_AFFECT_LIVE
)
1351 except libvirt
.libvirtError
as e
:
1352 text
= e
.get_error_message()
1353 print self
.name
, ": edit_iface(",port
["instance_id"],") libvirt exception:", text
1356 if conn
is not None: conn
.close
1359 def create_server(server
, db
, db_lock
, only_of_ports
):
1366 # host_id = server.get('host_id', None)
1367 extended
= server
.get('extended', None)
1369 # print '----------------------'
1370 # print json.dumps(extended, indent=4)
1373 requirements
['numa']={'memory':0, 'proc_req_type': 'threads', 'proc_req_nb':0, 'port_list':[], 'sriov_list':[]}
1374 requirements
['ram'] = server
['flavor'].get('ram', 0)
1375 if requirements
['ram']== None:
1376 requirements
['ram'] = 0
1377 requirements
['vcpus'] = server
['flavor'].get('vcpus', 0)
1378 if requirements
['vcpus']== None:
1379 requirements
['vcpus'] = 0
1380 #If extended is not defined get requirements from flavor
1381 if extended
is None:
1382 #If extended is defined in flavor convert to dictionary and use it
1383 if 'extended' in server
['flavor'] and server
['flavor']['extended'] != None:
1384 json_acceptable_string
= server
['flavor']['extended'].replace("'", "\"")
1385 extended
= json
.loads(json_acceptable_string
)
1388 #print json.dumps(extended, indent=4)
1390 #For simplicity only one numa VM are supported in the initial implementation
1391 if extended
!= None:
1392 numas
= extended
.get('numas', [])
1394 return (-2, "Multi-NUMA VMs are not supported yet")
1396 # return (-1, "At least one numa must be specified")
1398 #a for loop is used in order to be ready to multi-NUMA VMs
1402 numa_req
['memory'] = numa
.get('memory', 0)
1404 numa_req
['proc_req_nb'] = numa
['cores'] #number of cores or threads to be reserved
1405 numa_req
['proc_req_type'] = 'cores' #indicates whether cores or threads must be reserved
1406 numa_req
['proc_req_list'] = numa
.get('cores-id', None) #list of ids to be assigned to the cores or threads
1407 elif 'paired-threads' in numa
:
1408 numa_req
['proc_req_nb'] = numa
['paired-threads']
1409 numa_req
['proc_req_type'] = 'paired-threads'
1410 numa_req
['proc_req_list'] = numa
.get('paired-threads-id', None)
1411 elif 'threads' in numa
:
1412 numa_req
['proc_req_nb'] = numa
['threads']
1413 numa_req
['proc_req_type'] = 'threads'
1414 numa_req
['proc_req_list'] = numa
.get('threads-id', None)
1416 numa_req
['proc_req_nb'] = 0 # by default
1417 numa_req
['proc_req_type'] = 'threads'
1421 #Generate a list of sriov and another for physical interfaces
1422 interfaces
= numa
.get('interfaces', [])
1425 for iface
in interfaces
:
1426 iface
['bandwidth'] = int(iface
['bandwidth'])
1427 if iface
['dedicated'][:3]=='yes':
1428 port_list
.append(iface
)
1430 sriov_list
.append(iface
)
1432 #Save lists ordered from more restrictive to less bw requirements
1433 numa_req
['sriov_list'] = sorted(sriov_list
, key
=lambda k
: k
['bandwidth'], reverse
=True)
1434 numa_req
['port_list'] = sorted(port_list
, key
=lambda k
: k
['bandwidth'], reverse
=True)
1437 request
.append(numa_req
)
1439 # print "----------\n"+json.dumps(request[0], indent=4)
1440 # print '----------\n\n'
1442 #Search in db for an appropriate numa for each requested numa
1443 #at the moment multi-NUMA VMs are not supported
1445 requirements
['numa'].update(request
[0])
1446 if requirements
['numa']['memory']>0:
1447 requirements
['ram']=0 #By the moment I make incompatible ask for both Huge and non huge pages memory
1448 elif requirements
['ram']==0:
1449 return (-1, "Memory information not set neither at extended field not at ram")
1450 if requirements
['numa']['proc_req_nb']>0:
1451 requirements
['vcpus']=0 #By the moment I make incompatible ask for both Isolated and non isolated cpus
1452 elif requirements
['vcpus']==0:
1453 return (-1, "Processor information not set neither at extended field not at vcpus")
1457 result
, content
= db
.get_numas(requirements
, server
.get('host_id', None), only_of_ports
)
1461 return (-1, content
)
1463 numa_id
= content
['numa_id']
1464 host_id
= content
['host_id']
1466 #obtain threads_id and calculate pinning
1469 if requirements
['numa']['proc_req_nb']>0:
1471 result
, content
= db
.get_table(FROM
='resources_core',
1472 SELECT
=('id','core_id','thread_id'),
1473 WHERE
={'numa_id':numa_id
,'instance_id': None, 'status':'ok'} )
1479 #convert rows to a dictionary indexed by core_id
1482 if not row
['core_id'] in cores_dict
:
1483 cores_dict
[row
['core_id']] = []
1484 cores_dict
[row
['core_id']].append([row
['thread_id'],row
['id']])
1486 #In case full cores are requested
1488 if requirements
['numa']['proc_req_type'] == 'cores':
1489 #Get/create the list of the vcpu_ids
1490 vcpu_id_list
= requirements
['numa']['proc_req_list']
1491 if vcpu_id_list
== None:
1492 vcpu_id_list
= range(0,int(requirements
['numa']['proc_req_nb']))
1494 for threads
in cores_dict
.itervalues():
1496 if len(threads
) != 2:
1499 #set pinning for the first thread
1500 cpu_pinning
.append( [ vcpu_id_list
.pop(0), threads
[0][0], threads
[0][1] ] )
1502 #reserve so it is not used the second thread
1503 reserved_threads
.append(threads
[1][1])
1505 if len(vcpu_id_list
) == 0:
1508 #In case paired threads are requested
1509 elif requirements
['numa']['proc_req_type'] == 'paired-threads':
1511 #Get/create the list of the vcpu_ids
1512 if requirements
['numa']['proc_req_list'] != None:
1514 for pair
in requirements
['numa']['proc_req_list']:
1516 return -1, "Field paired-threads-id not properly specified"
1518 vcpu_id_list
.append(pair
[0])
1519 vcpu_id_list
.append(pair
[1])
1521 vcpu_id_list
= range(0,2*int(requirements
['numa']['proc_req_nb']))
1523 for threads
in cores_dict
.itervalues():
1525 if len(threads
) != 2:
1527 #set pinning for the first thread
1528 cpu_pinning
.append([vcpu_id_list
.pop(0), threads
[0][0], threads
[0][1]])
1530 #set pinning for the second thread
1531 cpu_pinning
.append([vcpu_id_list
.pop(0), threads
[1][0], threads
[1][1]])
1533 if len(vcpu_id_list
) == 0:
1536 #In case normal threads are requested
1537 elif requirements
['numa']['proc_req_type'] == 'threads':
1538 #Get/create the list of the vcpu_ids
1539 vcpu_id_list
= requirements
['numa']['proc_req_list']
1540 if vcpu_id_list
== None:
1541 vcpu_id_list
= range(0,int(requirements
['numa']['proc_req_nb']))
1543 for threads_index
in sorted(cores_dict
, key
=lambda k
: len(cores_dict
[k
])):
1544 threads
= cores_dict
[threads_index
]
1545 #set pinning for the first thread
1546 cpu_pinning
.append([vcpu_id_list
.pop(0), threads
[0][0], threads
[0][1]])
1548 #if exists, set pinning for the second thread
1549 if len(threads
) == 2 and len(vcpu_id_list
) != 0:
1550 cpu_pinning
.append([vcpu_id_list
.pop(0), threads
[1][0], threads
[1][1]])
1552 if len(vcpu_id_list
) == 0:
1555 #Get the source pci addresses for the selected numa
1556 used_sriov_ports
= []
1557 for port
in requirements
['numa']['sriov_list']:
1559 result
, content
= db
.get_table(FROM
='resources_port', SELECT
=('id', 'pci', 'mac'),WHERE
={'numa_id':numa_id
,'root_id': port
['port_id'], 'port_id': None, 'Mbps_used': 0} )
1565 if row
['id'] in used_sriov_ports
or row
['id']==port
['port_id']:
1567 port
['pci'] = row
['pci']
1568 if 'mac_address' not in port
:
1569 port
['mac_address'] = row
['mac']
1571 port
['port_id']=row
['id']
1572 port
['Mbps_used'] = port
['bandwidth']
1573 used_sriov_ports
.append(row
['id'])
1576 for port
in requirements
['numa']['port_list']:
1577 port
['Mbps_used'] = None
1578 if port
['dedicated'] != "yes:sriov":
1579 port
['mac_address'] = port
['mac']
1583 result
, content
= db
.get_table(FROM
='resources_port', SELECT
=('id', 'pci', 'mac', 'Mbps'),WHERE
={'numa_id':numa_id
,'root_id': port
['port_id'], 'port_id': None, 'Mbps_used': 0} )
1588 port
['Mbps_used'] = content
[0]['Mbps']
1590 if row
['id'] in used_sriov_ports
or row
['id']==port
['port_id']:
1592 port
['pci'] = row
['pci']
1593 if 'mac_address' not in port
:
1594 port
['mac_address'] = row
['mac'] # mac cannot be set to passthrough ports
1596 port
['port_id']=row
['id']
1597 used_sriov_ports
.append(row
['id'])
1600 # print '2. Physical ports assignation:'+json.dumps(requirements['port_list'], indent=4)
1601 # print '2. SR-IOV assignation:'+json.dumps(requirements['sriov_list'], indent=4)
1603 server
['host_id'] = host_id
1606 #Generate dictionary for saving in db the instance resources
1608 resources
['bridged-ifaces'] = []
1611 numa_dict
['interfaces'] = []
1613 numa_dict
['interfaces'] += requirements
['numa']['port_list']
1614 numa_dict
['interfaces'] += requirements
['numa']['sriov_list']
1616 #Check bridge information
1617 unified_dataplane_iface
=[]
1618 unified_dataplane_iface
+= requirements
['numa']['port_list']
1619 unified_dataplane_iface
+= requirements
['numa']['sriov_list']
1621 for control_iface
in server
.get('networks', []):
1622 control_iface
['net_id']=control_iface
.pop('uuid')
1623 #Get the brifge name
1625 result
, content
= db
.get_table(FROM
='nets', SELECT
=('name','type', 'vlan'),WHERE
={'uuid':control_iface
['net_id']} )
1630 return -1, "Error at field netwoks: Not found any network wit uuid %s" % control_iface
['net_id']
1633 if control_iface
.get("type", 'virtual') == 'virtual':
1634 if network
['type']!='bridge_data' and network
['type']!='bridge_man':
1635 return -1, "Error at field netwoks: network uuid %s for control interface is not of type bridge_man or bridge_data" % control_iface
['net_id']
1636 resources
['bridged-ifaces'].append(control_iface
)
1638 if network
['type']!='data' and network
['type']!='ptp':
1639 return -1, "Error at field netwoks: network uuid %s for dataplane interface is not of type data or ptp" % control_iface
['net_id']
1640 #dataplane interface, look for it in the numa tree and asign this network
1642 for dataplane_iface
in numa_dict
['interfaces']:
1643 if dataplane_iface
['name'] == control_iface
.get("name"):
1644 if (dataplane_iface
['dedicated'] == "yes" and control_iface
["type"] != "PF") or \
1645 (dataplane_iface
['dedicated'] == "no" and control_iface
["type"] != "VF") or \
1646 (dataplane_iface
['dedicated'] == "yes:sriov" and control_iface
["type"] != "VFnotShared") :
1647 return -1, "Error at field netwoks: mismatch at interface '%s' from flavor 'dedicated=%s' and networks 'type=%s'" % \
1648 (control_iface
.get("name"), dataplane_iface
['dedicated'], control_iface
["type"])
1649 dataplane_iface
['uuid'] = control_iface
['net_id']
1650 if dataplane_iface
['dedicated'] == "no":
1651 dataplane_iface
['vlan'] = network
['vlan']
1652 if dataplane_iface
['dedicated'] != "yes" and control_iface
.get("mac_address"):
1653 dataplane_iface
['mac_address'] = control_iface
.get("mac_address")
1654 if control_iface
.get("vpci"):
1655 dataplane_iface
['vpci'] = control_iface
.get("vpci")
1659 return -1, "Error at field netwoks: interface name %s from network not found at flavor" % control_iface
.get("name")
1661 resources
['host_id'] = host_id
1662 resources
['image_id'] = server
['image_id']
1663 resources
['flavor_id'] = server
['flavor_id']
1664 resources
['tenant_id'] = server
['tenant_id']
1665 resources
['ram'] = requirements
['ram']
1666 resources
['vcpus'] = requirements
['vcpus']
1667 resources
['status'] = 'CREATING'
1669 if 'description' in server
: resources
['description'] = server
['description']
1670 if 'name' in server
: resources
['name'] = server
['name']
1672 resources
['extended'] = {} #optional
1673 resources
['extended']['numas'] = []
1674 numa_dict
['numa_id'] = numa_id
1675 numa_dict
['memory'] = requirements
['numa']['memory']
1676 numa_dict
['cores'] = []
1678 for core
in cpu_pinning
:
1679 numa_dict
['cores'].append({'id': core
[2], 'vthread': core
[0], 'paired': paired
})
1680 for core
in reserved_threads
:
1681 numa_dict
['cores'].append({'id': core
})
1682 resources
['extended']['numas'].append(numa_dict
)
1683 if extended
!=None and 'devices' in extended
: #TODO allow extra devices without numa
1684 resources
['extended']['devices'] = extended
['devices']
1687 print '===================================={'
1688 print json
.dumps(resources
, indent
=4)
1689 print '====================================}'