1 # -*- coding: utf-8 -*-
4 # Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
5 # This file is part of openvim
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
12 # http://www.apache.org/licenses/LICENSE-2.0
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact with: nfvlabs@tid.es
25 This is thread that interact with the host and the libvirt to manage VM
26 One thread will be launched per host
28 __author__
= "Pablo Montes, Alfonso Tierno, Leonardo Mirabal"
29 __date__
= "$10-jul-2014 12:07:15$"
37 from jsonschema
import validate
as js_v
, exceptions
as js_e
40 from vim_schema
import localinfo_schema
, hostinfo_schema
46 class host_thread(threading
.Thread
):
49 def __init__(self
, name
, host
, user
, db
, db_lock
, test
, image_path
, host_id
, version
, develop_mode
,
50 develop_bridge_iface
, logger_name
=None, debug
=None):
55 'host','user': host ip or name to manage and user
56 'db', 'db_lock': database class and lock to use it in exclusion
58 threading
.Thread
.__init
__(self
)
63 self
.db_lock
= db_lock
65 self
.localinfo_dirty
= False
67 if not test
and not host_thread
.lvirt_module
:
69 module_info
= imp
.find_module("libvirt")
70 host_thread
.lvirt_module
= imp
.load_module("libvirt", *module_info
)
71 except (IOError, ImportError) as e
:
72 raise ImportError("Cannot import python-libvirt. Openvim not properly installed" +str(e
))
74 self
.logger_name
= logger_name
76 self
.logger_name
= "openvim.host."+name
77 self
.logger
= logging
.getLogger(self
.logger_name
)
79 self
.logger
.setLevel(getattr(logging
, debug
))
82 self
.develop_mode
= develop_mode
83 self
.develop_bridge_iface
= develop_bridge_iface
84 self
.image_path
= image_path
85 self
.host_id
= host_id
86 self
.version
= version
91 self
.server_status
= {} #dictionary with pairs server_uuid:server_status
92 self
.pending_terminate_server
=[] #list with pairs (time,server_uuid) time to send a terminate for a server being destroyed
93 self
.next_update_server_status
= 0 #time when must be check servers status
97 self
.queueLock
= threading
.Lock()
98 self
.taskQueue
= Queue
.Queue(2000)
101 def ssh_connect(self
):
104 self
.ssh_conn
= paramiko
.SSHClient()
105 self
.ssh_conn
.set_missing_host_key_policy(paramiko
.AutoAddPolicy())
106 self
.ssh_conn
.load_system_host_keys()
107 self
.ssh_conn
.connect(self
.host
, username
=self
.user
, timeout
=10) #, None)
108 except paramiko
.ssh_exception
.SSHException
as e
:
110 self
.logger
.error("ssh_connect ssh Exception: " + text
)
112 def load_localinfo(self
):
118 command
= 'mkdir -p ' + self
.image_path
119 #print self.name, ': command:', command
120 (_
, stdout
, stderr
) = self
.ssh_conn
.exec_command(command
)
121 content
= stderr
.read()
123 self
.logger
.error("command: '%s' stderr: '%s'", command
, content
)
125 command
= 'cat ' + self
.image_path
+ '/.openvim.yaml'
126 #print self.name, ': command:', command
127 (_
, stdout
, stderr
) = self
.ssh_conn
.exec_command(command
)
128 content
= stdout
.read()
129 if len(content
) == 0:
130 self
.logger
.error("command: '%s' stderr='%s'", command
, stderr
.read())
131 raise paramiko
.ssh_exception
.SSHException("Error empty file, command: '{}'".format(command
))
132 self
.localinfo
= yaml
.load(content
)
133 js_v(self
.localinfo
, localinfo_schema
)
134 self
.localinfo_dirty
= False
135 if 'server_files' not in self
.localinfo
:
136 self
.localinfo
['server_files'] = {}
137 self
.logger
.debug("localinfo load from host")
140 except paramiko
.ssh_exception
.SSHException
as e
:
142 self
.logger
.error("load_localinfo ssh Exception: " + text
)
143 except host_thread
.lvirt_module
.libvirtError
as e
:
144 text
= e
.get_error_message()
145 self
.logger
.error("load_localinfo libvirt Exception: " + text
)
146 except yaml
.YAMLError
as exc
:
148 if hasattr(exc
, 'problem_mark'):
149 mark
= exc
.problem_mark
150 text
= " at position: (%s:%s)" % (mark
.line
+1, mark
.column
+1)
151 self
.logger
.error("load_localinfo yaml format Exception " + text
)
152 except js_e
.ValidationError
as e
:
154 if len(e
.path
)>0: text
=" at '" + ":".join(map(str, e
.path
))+"'"
155 self
.logger
.error("load_localinfo format Exception: %s %s", text
, str(e
))
156 except Exception as e
:
158 self
.logger
.error("load_localinfo Exception: " + text
)
160 #not loaded, insert a default data and force saving by activating dirty flag
161 self
.localinfo
= {'files':{}, 'server_files':{} }
162 #self.localinfo_dirty=True
163 self
.localinfo_dirty
=False
165 def load_hostinfo(self
):
173 command
= 'cat ' + self
.image_path
+ '/hostinfo.yaml'
174 #print self.name, ': command:', command
175 (_
, stdout
, stderr
) = self
.ssh_conn
.exec_command(command
)
176 content
= stdout
.read()
177 if len(content
) == 0:
178 self
.logger
.error("command: '%s' stderr: '%s'", command
, stderr
.read())
179 raise paramiko
.ssh_exception
.SSHException("Error empty file ")
180 self
.hostinfo
= yaml
.load(content
)
181 js_v(self
.hostinfo
, hostinfo_schema
)
182 self
.logger
.debug("hostlinfo load from host " + str(self
.hostinfo
))
185 except paramiko
.ssh_exception
.SSHException
as e
:
187 self
.logger
.error("load_hostinfo ssh Exception: " + text
)
188 except host_thread
.lvirt_module
.libvirtError
as e
:
189 text
= e
.get_error_message()
190 self
.logger
.error("load_hostinfo libvirt Exception: " + text
)
191 except yaml
.YAMLError
as exc
:
193 if hasattr(exc
, 'problem_mark'):
194 mark
= exc
.problem_mark
195 text
= " at position: (%s:%s)" % (mark
.line
+1, mark
.column
+1)
196 self
.logger
.error("load_hostinfo yaml format Exception " + text
)
197 except js_e
.ValidationError
as e
:
199 if len(e
.path
)>0: text
=" at '" + ":".join(map(str, e
.path
))+"'"
200 self
.logger
.error("load_hostinfo format Exception: %s %s", text
, e
.message
)
201 except Exception as e
:
203 self
.logger
.error("load_hostinfo Exception: " + text
)
205 #not loaded, insert a default data
208 def save_localinfo(self
, tries
=3):
210 self
.localinfo_dirty
= False
217 command
= 'cat > ' + self
.image_path
+ '/.openvim.yaml'
218 self
.logger
.debug("command:" + command
)
219 (stdin
, _
, _
) = self
.ssh_conn
.exec_command(command
)
220 yaml
.safe_dump(self
.localinfo
, stdin
, explicit_start
=True, indent
=4, default_flow_style
=False, tags
=False, encoding
='utf-8', allow_unicode
=True)
221 self
.localinfo_dirty
= False
224 except paramiko
.ssh_exception
.SSHException
as e
:
226 self
.logger
.error("save_localinfo ssh Exception: " + text
)
227 if "SSH session not active" in text
:
229 except host_thread
.lvirt_module
.libvirtError
as e
:
230 text
= e
.get_error_message()
231 self
.logger
.error("save_localinfo libvirt Exception: " + text
)
232 except yaml
.YAMLError
as exc
:
234 if hasattr(exc
, 'problem_mark'):
235 mark
= exc
.problem_mark
236 text
= " at position: (%s:%s)" % (mark
.line
+1, mark
.column
+1)
237 self
.logger
.error("save_localinfo yaml format Exception " + text
)
238 except Exception as e
:
240 self
.logger
.error("save_localinfo Exception: " + text
)
242 def load_servers_from_db(self
):
243 self
.db_lock
.acquire()
244 r
,c
= self
.db
.get_table(SELECT
=('uuid','status', 'image_id'), FROM
='instances', WHERE
={'host_id': self
.host_id
})
245 self
.db_lock
.release()
247 self
.server_status
= {}
249 self
.logger
.error("Error getting data from database: " + c
)
252 self
.server_status
[ server
['uuid'] ] = server
['status']
254 #convert from old version to new one
255 if 'inc_files' in self
.localinfo
and server
['uuid'] in self
.localinfo
['inc_files']:
256 server_files_dict
= {'source file': self
.localinfo
['inc_files'][ server
['uuid'] ] [0], 'file format':'raw' }
257 if server_files_dict
['source file'][-5:] == 'qcow2':
258 server_files_dict
['file format'] = 'qcow2'
260 self
.localinfo
['server_files'][ server
['uuid'] ] = { server
['image_id'] : server_files_dict
}
261 if 'inc_files' in self
.localinfo
:
262 del self
.localinfo
['inc_files']
263 self
.localinfo_dirty
= True
265 def delete_unused_files(self
):
266 '''Compares self.localinfo['server_files'] content with real servers running self.server_status obtained from database
267 Deletes unused entries at self.loacalinfo and the corresponding local files.
268 The only reason for this mismatch is the manual deletion of instances (VM) at database
272 for uuid
,images
in self
.localinfo
['server_files'].items():
273 if uuid
not in self
.server_status
:
274 for localfile
in images
.values():
276 self
.logger
.debug("deleting file '%s' of unused server '%s'", localfile
['source file'], uuid
)
277 self
.delete_file(localfile
['source file'])
278 except paramiko
.ssh_exception
.SSHException
as e
:
279 self
.logger
.error("Exception deleting file '%s': %s", localfile
['source file'], str(e
))
280 del self
.localinfo
['server_files'][uuid
]
281 self
.localinfo_dirty
= True
283 def insert_task(self
, task
, *aditional
):
285 self
.queueLock
.acquire()
286 task
= self
.taskQueue
.put( (task
,) + aditional
, timeout
=5)
287 self
.queueLock
.release()
290 return -1, "timeout inserting a task over host " + self
.name
294 self
.load_localinfo()
296 self
.load_servers_from_db()
297 self
.delete_unused_files()
300 self
.queueLock
.acquire()
301 if not self
.taskQueue
.empty():
302 task
= self
.taskQueue
.get()
305 self
.queueLock
.release()
309 if self
.localinfo_dirty
:
310 self
.save_localinfo()
311 elif self
.next_update_server_status
< now
:
312 self
.update_servers_status()
313 self
.next_update_server_status
= now
+ 5
314 elif len(self
.pending_terminate_server
)>0 and self
.pending_terminate_server
[0][0]<now
:
315 self
.server_forceoff()
320 if task
[0] == 'instance':
321 self
.logger
.debug("processing task instance " + str(task
[1]['action']))
325 r
= self
.action_on_server(task
[1], retry
==2)
328 elif task
[0] == 'image':
330 elif task
[0] == 'exit':
331 self
.logger
.debug("processing task exit")
334 elif task
[0] == 'reload':
335 self
.logger
.debug("processing task reload terminating and relaunching")
338 elif task
[0] == 'edit-iface':
339 self
.logger
.debug("processing task edit-iface port={}, old_net={}, new_net={}".format(
340 task
[1], task
[2], task
[3]))
341 self
.edit_iface(task
[1], task
[2], task
[3])
342 elif task
[0] == 'restore-iface':
343 self
.logger
.debug("processing task restore-iface={} mac={}".format(task
[1], task
[2]))
344 self
.restore_iface(task
[1], task
[2])
345 elif task
[0] == 'new-ovsbridge':
346 self
.logger
.debug("Creating compute OVS bridge")
347 self
.create_ovs_bridge()
348 elif task
[0] == 'new-vxlan':
349 self
.logger
.debug("Creating vxlan tunnel='{}', remote ip='{}'".format(task
[1], task
[2]))
350 self
.create_ovs_vxlan_tunnel(task
[1], task
[2])
351 elif task
[0] == 'del-ovsbridge':
352 self
.logger
.debug("Deleting OVS bridge")
353 self
.delete_ovs_bridge()
354 elif task
[0] == 'del-vxlan':
355 self
.logger
.debug("Deleting vxlan {} tunnel".format(task
[1]))
356 self
.delete_ovs_vxlan_tunnel(task
[1])
357 elif task
[0] == 'create-ovs-bridge-port':
358 self
.logger
.debug("Adding port ovim-{} to OVS bridge".format(task
[1]))
359 self
.create_ovs_bridge_port(task
[1])
360 elif task
[0] == 'del-ovs-port':
361 self
.logger
.debug("Delete bridge attached to ovs port vlan {} net {}".format(task
[1], task
[2]))
362 self
.delete_bridge_port_attached_to_ovs(task
[1], task
[2])
364 self
.logger
.debug("unknown task " + str(task
))
366 except Exception as e
:
367 self
.logger
.critical("Unexpected exception at run: " + str(e
), exc_info
=True)
369 def server_forceoff(self
, wait_until_finished
=False):
370 while len(self
.pending_terminate_server
)>0:
372 if self
.pending_terminate_server
[0][0]>now
:
373 if wait_until_finished
:
378 req
={'uuid':self
.pending_terminate_server
[0][1],
379 'action':{'terminate':'force'},
382 self
.action_on_server(req
)
383 self
.pending_terminate_server
.pop(0)
387 self
.server_forceoff(True)
388 if self
.localinfo_dirty
:
389 self
.save_localinfo()
391 self
.ssh_conn
.close()
392 except Exception as e
:
394 self
.logger
.error("terminate Exception: " + text
)
395 self
.logger
.debug("exit from host_thread")
397 def get_local_iface_name(self
, generic_name
):
398 if self
.hostinfo
!= None and "iface_names" in self
.hostinfo
and generic_name
in self
.hostinfo
["iface_names"]:
399 return self
.hostinfo
["iface_names"][generic_name
]
402 def create_xml_server(self
, server
, dev_list
, server_metadata
={}):
403 """Function that implements the generation of the VM XML definition.
404 Additional devices are in dev_list list
405 The main disk is upon dev_list[0]"""
407 #get if operating system is Windows
409 os_type
= server_metadata
.get('os_type', None)
410 if os_type
== None and 'metadata' in dev_list
[0]:
411 os_type
= dev_list
[0]['metadata'].get('os_type', None)
412 if os_type
!= None and os_type
.lower() == "windows":
414 #get type of hard disk bus
415 bus_ide
= True if windows_os
else False
416 bus
= server_metadata
.get('bus', None)
417 if bus
== None and 'metadata' in dev_list
[0]:
418 bus
= dev_list
[0]['metadata'].get('bus', None)
420 bus_ide
= True if bus
=='ide' else False
424 text
= "<domain type='kvm'>"
426 topo
= server_metadata
.get('topology', None)
427 if topo
== None and 'metadata' in dev_list
[0]:
428 topo
= dev_list
[0]['metadata'].get('topology', None)
430 name
= server
.get('name','') + "_" + server
['uuid']
431 name
= name
[:58] #qemu impose a length limit of 59 chars or not start. Using 58
432 text
+= self
.inc_tab() + "<name>" + name
+ "</name>"
434 text
+= self
.tab() + "<uuid>" + server
['uuid'] + "</uuid>"
437 if 'extended' in server
and server
['extended']!=None and 'numas' in server
['extended']:
438 numa
= server
['extended']['numas'][0]
441 memory
= int(numa
.get('memory',0))*1024*1024 #in KiB
443 memory
= int(server
['ram'])*1024;
445 if not self
.develop_mode
:
448 return -1, 'No memory assigned to instance'
450 text
+= self
.tab() + "<memory unit='KiB'>" +memory
+"</memory>"
451 text
+= self
.tab() + "<currentMemory unit='KiB'>" +memory
+ "</currentMemory>"
453 text
+= self
.tab()+'<memoryBacking>'+ \
454 self
.inc_tab() + '<hugepages/>'+ \
455 self
.dec_tab()+ '</memoryBacking>'
458 use_cpu_pinning
=False
459 vcpus
= int(server
.get("vcpus",0))
461 if 'cores-source' in numa
:
463 for index
in range(0, len(numa
['cores-source'])):
464 cpu_pinning
.append( [ numa
['cores-id'][index
], numa
['cores-source'][index
] ] )
466 if 'threads-source' in numa
:
468 for index
in range(0, len(numa
['threads-source'])):
469 cpu_pinning
.append( [ numa
['threads-id'][index
], numa
['threads-source'][index
] ] )
471 if 'paired-threads-source' in numa
:
473 for index
in range(0, len(numa
['paired-threads-source'])):
474 cpu_pinning
.append( [numa
['paired-threads-id'][index
][0], numa
['paired-threads-source'][index
][0] ] )
475 cpu_pinning
.append( [numa
['paired-threads-id'][index
][1], numa
['paired-threads-source'][index
][1] ] )
478 if use_cpu_pinning
and not self
.develop_mode
:
479 text
+= self
.tab()+"<vcpu placement='static'>" +str(len(cpu_pinning
)) +"</vcpu>" + \
480 self
.tab()+'<cputune>'
482 for i
in range(0, len(cpu_pinning
)):
483 text
+= self
.tab() + "<vcpupin vcpu='" +str(cpu_pinning
[i
][0])+ "' cpuset='" +str(cpu_pinning
[i
][1]) +"'/>"
484 text
+= self
.dec_tab()+'</cputune>'+ \
485 self
.tab() + '<numatune>' +\
486 self
.inc_tab() + "<memory mode='strict' nodeset='" +str(numa
['source'])+ "'/>" +\
487 self
.dec_tab() + '</numatune>'
490 return -1, "Instance without number of cpus"
491 text
+= self
.tab()+"<vcpu>" + str(vcpus
) + "</vcpu>"
496 if dev
['type']=='cdrom' :
499 text
+= self
.tab()+ '<os>' + \
500 self
.inc_tab() + "<type arch='x86_64' machine='pc'>hvm</type>"
502 text
+= self
.tab() + "<boot dev='cdrom'/>"
503 text
+= self
.tab() + "<boot dev='hd'/>" + \
504 self
.dec_tab()+'</os>'
506 text
+= self
.tab()+'<features>'+\
507 self
.inc_tab()+'<acpi/>' +\
508 self
.tab()+'<apic/>' +\
509 self
.tab()+'<pae/>'+ \
510 self
.dec_tab() +'</features>'
511 if topo
== "oneSocket:hyperthreading":
513 return -1, 'Cannot expose hyperthreading with an odd number of vcpus'
514 text
+= self
.tab() + "<cpu mode='host-model'> <topology sockets='1' cores='%d' threads='2' /> </cpu>" % vcpus
/2
515 elif windows_os
or topo
== "oneSocket":
516 text
+= self
.tab() + "<cpu mode='host-model'> <topology sockets='1' cores='%d' threads='1' /> </cpu>" % vcpus
518 text
+= self
.tab() + "<cpu mode='host-model'></cpu>"
519 text
+= self
.tab() + "<clock offset='utc'/>" +\
520 self
.tab() + "<on_poweroff>preserve</on_poweroff>" + \
521 self
.tab() + "<on_reboot>restart</on_reboot>" + \
522 self
.tab() + "<on_crash>restart</on_crash>"
523 text
+= self
.tab() + "<devices>" + \
524 self
.inc_tab() + "<emulator>/usr/libexec/qemu-kvm</emulator>" + \
525 self
.tab() + "<serial type='pty'>" +\
526 self
.inc_tab() + "<target port='0'/>" + \
527 self
.dec_tab() + "</serial>" +\
528 self
.tab() + "<console type='pty'>" + \
529 self
.inc_tab()+ "<target type='serial' port='0'/>" + \
530 self
.dec_tab()+'</console>'
532 text
+= self
.tab() + "<controller type='usb' index='0'/>" + \
533 self
.tab() + "<controller type='ide' index='0'/>" + \
534 self
.tab() + "<input type='mouse' bus='ps2'/>" + \
535 self
.tab() + "<sound model='ich6'/>" + \
536 self
.tab() + "<video>" + \
537 self
.inc_tab() + "<model type='cirrus' vram='9216' heads='1'/>" + \
538 self
.dec_tab() + "</video>" + \
539 self
.tab() + "<memballoon model='virtio'/>" + \
540 self
.tab() + "<input type='tablet' bus='usb'/>" #TODO revisar
542 #> self.tab()+'<alias name=\'hostdev0\'/>\n' +\
543 #> self.dec_tab()+'</hostdev>\n' +\
544 #> self.tab()+'<input type=\'tablet\' bus=\'usb\'/>\n'
546 text
+= self
.tab() + "<graphics type='vnc' port='-1' autoport='yes'/>"
548 #If image contains 'GRAPH' include graphics
549 #if 'GRAPH' in image:
550 text
+= self
.tab() + "<graphics type='vnc' port='-1' autoport='yes' listen='0.0.0.0'>" +\
551 self
.inc_tab() + "<listen type='address' address='0.0.0.0'/>" +\
552 self
.dec_tab() + "</graphics>"
556 bus_ide_dev
= bus_ide
557 if dev
['type']=='cdrom' or dev
['type']=='disk':
558 if dev
['type']=='cdrom':
560 text
+= self
.tab() + "<disk type='file' device='"+dev
['type']+"'>"
561 if 'file format' in dev
:
562 text
+= self
.inc_tab() + "<driver name='qemu' type='" +dev
['file format']+ "' cache='writethrough'/>"
563 if 'source file' in dev
:
564 text
+= self
.tab() + "<source file='" +dev
['source file']+ "'/>"
565 #elif v['type'] == 'block':
566 # text += self.tab() + "<source dev='" + v['source'] + "'/>"
568 # return -1, 'Unknown disk type ' + v['type']
569 vpci
= dev
.get('vpci',None)
571 vpci
= dev
['metadata'].get('vpci',None)
572 text
+= self
.pci2xml(vpci
)
575 text
+= self
.tab() + "<target dev='hd" +vd_index
+ "' bus='ide'/>" #TODO allows several type of disks
577 text
+= self
.tab() + "<target dev='vd" +vd_index
+ "' bus='virtio'/>"
578 text
+= self
.dec_tab() + '</disk>'
579 vd_index
= chr(ord(vd_index
)+1)
580 elif dev
['type']=='xml':
581 dev_text
= dev
['xml']
583 dev_text
= dev_text
.replace('__vpci__', dev
['vpci'])
584 if 'source file' in dev
:
585 dev_text
= dev_text
.replace('__file__', dev
['source file'])
586 if 'file format' in dev
:
587 dev_text
= dev_text
.replace('__format__', dev
['source file'])
588 if '__dev__' in dev_text
:
589 dev_text
= dev_text
.replace('__dev__', vd_index
)
590 vd_index
= chr(ord(vd_index
)+1)
593 return -1, 'Unknown device type ' + dev
['type']
596 bridge_interfaces
= server
.get('networks', [])
597 for v
in bridge_interfaces
:
599 self
.db_lock
.acquire()
600 result
, content
= self
.db
.get_table(FROM
='nets', SELECT
=('provider',),WHERE
={'uuid':v
['net_id']} )
601 self
.db_lock
.release()
603 self
.logger
.error("create_xml_server ERROR %d getting nets %s", result
, content
)
605 #ALF: Allow by the moment the 'default' bridge net because is confortable for provide internet to VM
606 #I know it is not secure
607 #for v in sorted(desc['network interfaces'].itervalues()):
608 model
= v
.get("model", None)
609 if content
[0]['provider']=='default':
610 text
+= self
.tab() + "<interface type='network'>" + \
611 self
.inc_tab() + "<source network='" +content
[0]['provider']+ "'/>"
612 elif content
[0]['provider'][0:7]=='macvtap':
613 text
+= self
.tab()+"<interface type='direct'>" + \
614 self
.inc_tab() + "<source dev='" + self
.get_local_iface_name(content
[0]['provider'][8:]) + "' mode='bridge'/>" + \
615 self
.tab() + "<target dev='macvtap0'/>"
617 text
+= self
.tab() + "<alias name='net" + str(net_nb
) + "'/>"
620 elif content
[0]['provider'][0:6]=='bridge':
621 text
+= self
.tab() + "<interface type='bridge'>" + \
622 self
.inc_tab()+"<source bridge='" +self
.get_local_iface_name(content
[0]['provider'][7:])+ "'/>"
624 text
+= self
.tab() + "<target dev='vnet" + str(net_nb
)+ "'/>" +\
625 self
.tab() + "<alias name='net" + str(net_nb
)+ "'/>"
628 elif content
[0]['provider'][0:3] == "OVS":
629 vlan
= content
[0]['provider'].replace('OVS:', '')
630 text
+= self
.tab() + "<interface type='bridge'>" + \
631 self
.inc_tab() + "<source bridge='ovim-" + str(vlan
) + "'/>"
633 return -1, 'Unknown Bridge net provider ' + content
[0]['provider']
635 text
+= self
.tab() + "<model type='" +model
+ "'/>"
636 if v
.get('mac_address', None) != None:
637 text
+= self
.tab() +"<mac address='" +v
['mac_address']+ "'/>"
638 text
+= self
.pci2xml(v
.get('vpci',None))
639 text
+= self
.dec_tab()+'</interface>'
643 interfaces
= numa
.get('interfaces', [])
647 if self
.develop_mode
: #map these interfaces to bridges
648 text
+= self
.tab() + "<interface type='bridge'>" + \
649 self
.inc_tab()+"<source bridge='" +self
.develop_bridge_iface
+ "'/>"
651 text
+= self
.tab() + "<target dev='vnet" + str(net_nb
)+ "'/>" +\
652 self
.tab() + "<alias name='net" + str(net_nb
)+ "'/>"
654 text
+= self
.tab() + "<model type='e1000'/>" #e1000 is more probable to be supported than 'virtio'
655 if v
.get('mac_address', None) != None:
656 text
+= self
.tab() +"<mac address='" +v
['mac_address']+ "'/>"
657 text
+= self
.pci2xml(v
.get('vpci',None))
658 text
+= self
.dec_tab()+'</interface>'
661 if v
['dedicated'] == 'yes': #passthrought
662 text
+= self
.tab() + "<hostdev mode='subsystem' type='pci' managed='yes'>" + \
663 self
.inc_tab() + "<source>"
665 text
+= self
.pci2xml(v
['source'])
666 text
+= self
.dec_tab()+'</source>'
667 text
+= self
.pci2xml(v
.get('vpci',None))
669 text
+= self
.tab() + "<alias name='hostdev" + str(net_nb
) + "'/>"
670 text
+= self
.dec_tab()+'</hostdev>'
672 else: #sriov_interfaces
673 #skip not connected interfaces
674 if v
.get("net_id") == None:
676 text
+= self
.tab() + "<interface type='hostdev' managed='yes'>"
678 if v
.get('mac_address', None) != None:
679 text
+= self
.tab() + "<mac address='" +v
['mac_address']+ "'/>"
680 text
+= self
.tab()+'<source>'
682 text
+= self
.pci2xml(v
['source'])
683 text
+= self
.dec_tab()+'</source>'
684 if v
.get('vlan',None) != None:
685 text
+= self
.tab() + "<vlan> <tag id='" + str(v
['vlan']) + "'/> </vlan>"
686 text
+= self
.pci2xml(v
.get('vpci',None))
688 text
+= self
.tab() + "<alias name='hostdev" + str(net_nb
) + "'/>"
689 text
+= self
.dec_tab()+'</interface>'
692 text
+= self
.dec_tab()+'</devices>'+\
693 self
.dec_tab()+'</domain>'
696 def pci2xml(self
, pci
):
697 '''from a pci format text XXXX:XX:XX.X generates the xml content of <address>
698 alows an empty pci text'''
701 first_part
= pci
.split(':')
702 second_part
= first_part
[2].split('.')
703 return self
.tab() + "<address type='pci' domain='0x" + first_part
[0] + \
704 "' bus='0x" + first_part
[1] + "' slot='0x" + second_part
[0] + \
705 "' function='0x" + second_part
[1] + "'/>"
708 """Return indentation according to xml_level"""
709 return "\n" + (' '*self
.xml_level
)
712 """Increment and return indentation according to xml_level"""
717 """Decrement and return indentation according to xml_level"""
721 def create_ovs_bridge(self
):
723 Create a bridge in compute OVS to allocate VMs
724 :return: True if success
729 command
= 'sudo ovs-vsctl --may-exist add-br br-int -- set Bridge br-int stp_enable=true'
730 self
.logger
.debug("command: " + command
)
731 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
732 content
= stdout
.read()
733 if len(content
) == 0:
737 except paramiko
.ssh_exception
.SSHException
as e
:
738 self
.logger
.error("create_ovs_bridge ssh Exception: " + str(e
))
739 if "SSH session not active" in str(e
):
743 def delete_port_to_ovs_bridge(self
, vlan
, net_uuid
):
745 Delete linux bridge port attched to a OVS bridge, if port is not free the port is not removed
746 :param vlan: vlan port id
747 :param net_uuid: network id
754 port_name
= 'ovim-' + str(vlan
)
755 command
= 'sudo ovs-vsctl del-port br-int ' + port_name
756 self
.logger
.debug("command: " + command
)
757 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
758 content
= stdout
.read()
759 if len(content
) == 0:
763 except paramiko
.ssh_exception
.SSHException
as e
:
764 self
.logger
.error("delete_port_to_ovs_bridge ssh Exception: " + str(e
))
765 if "SSH session not active" in str(e
):
769 def delete_dhcp_server(self
, vlan
, net_uuid
, dhcp_path
):
771 Delete dhcp server process lining in namespace
772 :param vlan: segmentation id
773 :param net_uuid: network uuid
774 :param dhcp_path: conf fiel path that live in namespace side
779 if not self
.is_dhcp_port_free(vlan
, net_uuid
):
782 net_namespace
= 'ovim-' + str(vlan
)
783 dhcp_path
= os
.path
.join(dhcp_path
, net_namespace
)
784 pid_file
= os
.path
.join(dhcp_path
, 'dnsmasq.pid')
786 command
= 'sudo ip netns exec ' + net_namespace
+ ' cat ' + pid_file
787 self
.logger
.debug("command: " + command
)
788 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
789 content
= stdout
.read()
791 command
= 'sudo ip netns exec ' + net_namespace
+ ' kill -9 ' + content
792 self
.logger
.debug("command: " + command
)
793 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
794 content
= stdout
.read()
796 # if len(content) == 0:
800 except paramiko
.ssh_exception
.SSHException
as e
:
801 self
.logger
.error("delete_dhcp_server ssh Exception: " + str(e
))
802 if "SSH session not active" in str(e
):
806 def is_dhcp_port_free(self
, host_id
, net_uuid
):
808 Check if any port attached to the a net in a vxlan mesh across computes nodes
809 :param host_id: host id
810 :param net_uuid: network id
811 :return: True if is not free
813 self
.db_lock
.acquire()
814 result
, content
= self
.db
.get_table(
816 WHERE
={'type': 'instance:ovs', 'net_id': net_uuid
}
818 self
.db_lock
.release()
825 def is_port_free(self
, host_id
, net_uuid
):
827 Check if there not ovs ports of a network in a compute host.
828 :param host_id: host id
829 :param net_uuid: network id
830 :return: True if is not free
833 self
.db_lock
.acquire()
834 result
, content
= self
.db
.get_table(
835 FROM
='ports as p join instances as i on p.instance_id=i.uuid',
836 WHERE
={"i.host_id": self
.host_id
, 'p.type': 'instance:ovs', 'p.net_id': net_uuid
}
838 self
.db_lock
.release()
845 def add_port_to_ovs_bridge(self
, vlan
):
847 Add a bridge linux as a port to a OVS bridge and set a vlan for an specific linux bridge
848 :param vlan: vlan port id
849 :return: True if success
855 port_name
= 'ovim-' + str(vlan
)
856 command
= 'sudo ovs-vsctl add-port br-int ' + port_name
+ ' tag=' + str(vlan
)
857 self
.logger
.debug("command: " + command
)
858 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
859 content
= stdout
.read()
860 if len(content
) == 0:
864 except paramiko
.ssh_exception
.SSHException
as e
:
865 self
.logger
.error("add_port_to_ovs_bridge ssh Exception: " + str(e
))
866 if "SSH session not active" in str(e
):
870 def delete_dhcp_port(self
, vlan
, net_uuid
):
872 Delete from an existing OVS bridge a linux bridge port attached and the linux bridge itself.
873 :param vlan: segmentation id
874 :param net_uuid: network id
875 :return: True if success
881 if not self
.is_dhcp_port_free(vlan
, net_uuid
):
883 self
.delete_dhcp_interfaces(vlan
)
886 def delete_bridge_port_attached_to_ovs(self
, vlan
, net_uuid
):
888 Delete from an existing OVS bridge a linux bridge port attached and the linux bridge itself.
891 :return: True if success
896 if not self
.is_port_free(vlan
, net_uuid
):
898 self
.delete_port_to_ovs_bridge(vlan
, net_uuid
)
899 self
.delete_linux_bridge(vlan
)
902 def delete_linux_bridge(self
, vlan
):
904 Delete a linux bridge in a scpecific compute.
905 :param vlan: vlan port id
906 :return: True if success
912 port_name
= 'ovim-' + str(vlan
)
913 command
= 'sudo ip link set dev veth0-' + str(vlan
) + ' down'
914 self
.logger
.debug("command: " + command
)
915 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
916 # content = stdout.read()
918 # if len(content) != 0:
920 command
= 'sudo ifconfig ' + port_name
+ ' down && sudo brctl delbr ' + port_name
921 self
.logger
.debug("command: " + command
)
922 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
923 content
= stdout
.read()
924 if len(content
) == 0:
928 except paramiko
.ssh_exception
.SSHException
as e
:
929 self
.logger
.error("delete_linux_bridge ssh Exception: " + str(e
))
930 if "SSH session not active" in str(e
):
934 def create_ovs_bridge_port(self
, vlan
):
936 Generate a linux bridge and attache the port to a OVS bridge
937 :param vlan: vlan port id
942 self
.create_linux_bridge(vlan
)
943 self
.add_port_to_ovs_bridge(vlan
)
945 def create_linux_bridge(self
, vlan
):
947 Create a linux bridge with STP active
948 :param vlan: netowrk vlan id
955 port_name
= 'ovim-' + str(vlan
)
956 command
= 'sudo brctl show | grep ' + port_name
957 self
.logger
.debug("command: " + command
)
958 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
959 content
= stdout
.read()
961 # if exist nothing to create
962 # if len(content) == 0:
965 command
= 'sudo brctl addbr ' + port_name
966 self
.logger
.debug("command: " + command
)
967 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
968 content
= stdout
.read()
970 # if len(content) == 0:
975 command
= 'sudo brctl stp ' + port_name
+ ' on'
976 self
.logger
.debug("command: " + command
)
977 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
978 content
= stdout
.read()
980 # if len(content) == 0:
984 command
= 'sudo ip link set dev ' + port_name
+ ' up'
985 self
.logger
.debug("command: " + command
)
986 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
987 content
= stdout
.read()
989 if len(content
) == 0:
993 except paramiko
.ssh_exception
.SSHException
as e
:
994 self
.logger
.error("create_linux_bridge ssh Exception: " + str(e
))
995 if "SSH session not active" in str(e
):
999 def set_mac_dhcp_server(self
, ip
, mac
, vlan
, netmask
, dhcp_path
):
1001 Write into dhcp conf file a rule to assigned a fixed ip given to an specific MAC address
1002 :param ip: IP address asigned to a VM
1003 :param mac: VM vnic mac to be macthed with the IP received
1004 :param vlan: Segmentation id
1005 :param netmask: netmask value
1006 :param path: dhcp conf file path that live in namespace side
1007 :return: True if success
1013 net_namespace
= 'ovim-' + str(vlan
)
1014 dhcp_path
= os
.path
.join(dhcp_path
, net_namespace
)
1015 dhcp_hostsdir
= os
.path
.join(dhcp_path
, net_namespace
)
1020 ip_data
= mac
.upper() + ',' + ip
1022 command
= 'sudo ip netns exec ' + net_namespace
+ ' touch ' + dhcp_hostsdir
1023 self
.logger
.debug("command: " + command
)
1024 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
1025 content
= stdout
.read()
1027 command
= 'sudo ip netns exec ' + net_namespace
+ ' sudo bash -ec "echo ' + ip_data
+ ' >> ' + dhcp_hostsdir
+ '"'
1029 self
.logger
.debug("command: " + command
)
1030 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
1031 content
= stdout
.read()
1033 if len(content
) == 0:
1037 except paramiko
.ssh_exception
.SSHException
as e
:
1038 self
.logger
.error("set_mac_dhcp_server ssh Exception: " + str(e
))
1039 if "SSH session not active" in str(e
):
1043 def delete_mac_dhcp_server(self
, ip
, mac
, vlan
, dhcp_path
):
1045 Delete into dhcp conf file the ip assigned to a specific MAC address
1047 :param ip: IP address asigned to a VM
1048 :param mac: VM vnic mac to be macthed with the IP received
1049 :param vlan: Segmentation id
1050 :param dhcp_path: dhcp conf file path that live in namespace side
1057 net_namespace
= 'ovim-' + str(vlan
)
1058 dhcp_path
= os
.path
.join(dhcp_path
, net_namespace
)
1059 dhcp_hostsdir
= os
.path
.join(dhcp_path
, net_namespace
)
1064 ip_data
= mac
.upper() + ',' + ip
1066 command
= 'sudo ip netns exec ' + net_namespace
+ ' sudo sed -i \'/' + ip_data
+ '/d\' ' + dhcp_hostsdir
1067 self
.logger
.debug("command: " + command
)
1068 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
1069 content
= stdout
.read()
1071 if len(content
) == 0:
1076 except paramiko
.ssh_exception
.SSHException
as e
:
1077 self
.logger
.error("set_mac_dhcp_server ssh Exception: " + str(e
))
1078 if "SSH session not active" in str(e
):
1082 def launch_dhcp_server(self
, vlan
, ip_range
, netmask
, dhcp_path
, gateway
):
1084 Generate a linux bridge and attache the port to a OVS bridge
1086 :param vlan: Segmentation id
1087 :param ip_range: IP dhcp range
1088 :param netmask: network netmask
1089 :param dhcp_path: dhcp conf file path that live in namespace side
1090 :param gateway: Gateway address for dhcp net
1091 :return: True if success
1097 interface
= 'tap-' + str(vlan
)
1098 net_namespace
= 'ovim-' + str(vlan
)
1099 dhcp_path
= os
.path
.join(dhcp_path
, net_namespace
)
1100 leases_path
= os
.path
.join(dhcp_path
, "dnsmasq.leases")
1101 pid_file
= os
.path
.join(dhcp_path
, 'dnsmasq.pid')
1103 dhcp_range
= ip_range
[0] + ',' + ip_range
[1] + ',' + netmask
1105 command
= 'sudo ip netns exec ' + net_namespace
+ ' mkdir -p ' + dhcp_path
1106 self
.logger
.debug("command: " + command
)
1107 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
1108 content
= stdout
.read()
1110 pid_path
= os
.path
.join(dhcp_path
, 'dnsmasq.pid')
1111 command
= 'sudo ip netns exec ' + net_namespace
+ ' cat ' + pid_path
1112 self
.logger
.debug("command: " + command
)
1113 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
1114 content
= stdout
.read()
1115 # check if pid is runing
1116 pid_status_path
= content
1118 command
= "ps aux | awk '{print $2 }' | grep " + pid_status_path
1119 self
.logger
.debug("command: " + command
)
1120 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
1121 content
= stdout
.read()
1123 command
= 'sudo ip netns exec ' + net_namespace
+ ' /usr/sbin/dnsmasq --strict-order --except-interface=lo ' \
1124 '--interface=' + interface
+ ' --bind-interfaces --dhcp-hostsdir=' + dhcp_path
+ \
1125 ' --dhcp-range ' + dhcp_range
+ ' --pid-file=' + pid_file
+ ' --dhcp-leasefile=' + leases_path
+ \
1126 ' --listen-address ' + gateway
1128 self
.logger
.debug("command: " + command
)
1129 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
1130 content
= stdout
.readline()
1132 if len(content
) == 0:
1136 except paramiko
.ssh_exception
.SSHException
as e
:
1137 self
.logger
.error("launch_dhcp_server ssh Exception: " + str(e
))
1138 if "SSH session not active" in str(e
):
1142 def delete_dhcp_interfaces(self
, vlan
):
1144 Create a linux bridge with STP active
1145 :param vlan: netowrk vlan id
1152 net_namespace
= 'ovim-' + str(vlan
)
1153 command
= 'sudo ovs-vsctl del-port br-int ovs-tap-' + str(vlan
)
1154 self
.logger
.debug("command: " + command
)
1155 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
1156 content
= stdout
.read()
1158 command
= 'sudo ip netns exec ' + net_namespace
+ ' ip link set dev tap-' + str(vlan
) + ' down'
1159 self
.logger
.debug("command: " + command
)
1160 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
1161 content
= stdout
.read()
1163 command
= 'sudo ip link set dev ovs-tap-' + str(vlan
) + ' down'
1164 self
.logger
.debug("command: " + command
)
1165 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
1166 content
= stdout
.read()
1167 except paramiko
.ssh_exception
.SSHException
as e
:
1168 self
.logger
.error("delete_dhcp_interfaces ssh Exception: " + str(e
))
1169 if "SSH session not active" in str(e
):
1173 def create_dhcp_interfaces(self
, vlan
, ip_listen_address
, netmask
):
1175 Create a linux bridge with STP active
1176 :param vlan: segmentation id
1177 :param ip_listen_address: Listen Ip address for the dhcp service, the tap interface living in namesapce side
1178 :param netmask: dhcp net CIDR
1179 :return: True if success
1185 net_namespace
= 'ovim-' + str(vlan
)
1186 namespace_interface
= 'tap-' + str(vlan
)
1188 command
= 'sudo ip netns add ' + net_namespace
1189 self
.logger
.debug("command: " + command
)
1190 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
1191 content
= stdout
.read()
1193 command
= 'sudo ip link add tap-' + str(vlan
) + ' type veth peer name ovs-tap-' + str(vlan
)
1194 self
.logger
.debug("command: " + command
)
1195 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
1196 content
= stdout
.read()
1198 command
= 'sudo ovs-vsctl add-port br-int ovs-tap-' + str(vlan
) + ' tag=' + str(vlan
)
1199 self
.logger
.debug("command: " + command
)
1200 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
1201 content
= stdout
.read()
1203 command
= 'sudo ip link set tap-' + str(vlan
) + ' netns ' + net_namespace
1204 self
.logger
.debug("command: " + command
)
1205 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
1206 content
= stdout
.read()
1208 command
= 'sudo ip netns exec ' + net_namespace
+ ' ip link set dev tap-' + str(vlan
) + ' up'
1209 self
.logger
.debug("command: " + command
)
1210 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
1211 content
= stdout
.read()
1213 command
= 'sudo ip link set dev ovs-tap-' + str(vlan
) + ' up'
1214 self
.logger
.debug("command: " + command
)
1215 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
1216 content
= stdout
.read()
1218 command
= 'sudo ip netns exec ' + net_namespace
+ ' ip link set dev lo up'
1219 self
.logger
.debug("command: " + command
)
1220 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
1221 content
= stdout
.read()
1223 command
= 'sudo ip netns exec ' + net_namespace
+ ' ' + ' ifconfig ' + namespace_interface \
1224 + ' ' + ip_listen_address
+ ' netmask ' + netmask
1225 self
.logger
.debug("command: " + command
)
1226 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
1227 content
= stdout
.read()
1229 if len(content
) == 0:
1233 except paramiko
.ssh_exception
.SSHException
as e
:
1234 self
.logger
.error("create_dhcp_interfaces ssh Exception: " + str(e
))
1235 if "SSH session not active" in str(e
):
1240 def create_ovs_vxlan_tunnel(self
, vxlan_interface
, remote_ip
):
1242 Create a vlxn tunnel between to computes with an OVS installed. STP is also active at port level
1243 :param vxlan_interface: vlxan inteface name.
1244 :param remote_ip: tunnel endpoint remote compute ip.
1250 command
= 'sudo ovs-vsctl add-port br-int ' + vxlan_interface
+ \
1251 ' -- set Interface ' + vxlan_interface
+ ' type=vxlan options:remote_ip=' + remote_ip
+ \
1252 ' -- set Port ' + vxlan_interface
+ ' other_config:stp-path-cost=10'
1253 self
.logger
.debug("command: " + command
)
1254 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
1255 content
= stdout
.read()
1257 if len(content
) == 0:
1261 except paramiko
.ssh_exception
.SSHException
as e
:
1262 self
.logger
.error("create_ovs_vxlan_tunnel ssh Exception: " + str(e
))
1263 if "SSH session not active" in str(e
):
1267 def delete_ovs_vxlan_tunnel(self
, vxlan_interface
):
1269 Delete a vlxan tunnel port from a OVS brdige.
1270 :param vxlan_interface: vlxan name to be delete it.
1271 :return: True if success.
1276 command
= 'sudo ovs-vsctl del-port br-int ' + vxlan_interface
1277 self
.logger
.debug("command: " + command
)
1278 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
1279 content
= stdout
.read()
1281 if len(content
) == 0:
1285 except paramiko
.ssh_exception
.SSHException
as e
:
1286 self
.logger
.error("delete_ovs_vxlan_tunnel ssh Exception: " + str(e
))
1287 if "SSH session not active" in str(e
):
1291 def delete_ovs_bridge(self
):
1293 Delete a OVS bridge from a compute.
1294 :return: True if success
1299 command
= 'sudo ovs-vsctl del-br br-int'
1300 self
.logger
.debug("command: " + command
)
1301 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
1302 content
= stdout
.read()
1303 if len(content
) == 0:
1307 except paramiko
.ssh_exception
.SSHException
as e
:
1308 self
.logger
.error("delete_ovs_bridge ssh Exception: " + str(e
))
1309 if "SSH session not active" in str(e
):
1313 def get_file_info(self
, path
):
1314 command
= 'ls -lL --time-style=+%Y-%m-%dT%H:%M:%S ' + path
1315 self
.logger
.debug("command: " + command
)
1316 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
1317 content
= stdout
.read()
1318 if len(content
) == 0:
1319 return None # file does not exist
1321 return content
.split(" ") # (permission, 1, owner, group, size, date, file)
1323 def qemu_get_info(self
, path
):
1324 command
= 'qemu-img info ' + path
1325 self
.logger
.debug("command: " + command
)
1326 (_
, stdout
, stderr
) = self
.ssh_conn
.exec_command(command
)
1327 content
= stdout
.read()
1328 if len(content
) == 0:
1329 error
= stderr
.read()
1330 self
.logger
.error("get_qemu_info error " + error
)
1331 raise paramiko
.ssh_exception
.SSHException("Error getting qemu_info: " + error
)
1334 return yaml
.load(content
)
1335 except yaml
.YAMLError
as exc
:
1337 if hasattr(exc
, 'problem_mark'):
1338 mark
= exc
.problem_mark
1339 text
= " at position: (%s:%s)" % (mark
.line
+1, mark
.column
+1)
1340 self
.logger
.error("get_qemu_info yaml format Exception " + text
)
1341 raise paramiko
.ssh_exception
.SSHException("Error getting qemu_info yaml format" + text
)
1343 def qemu_change_backing(self
, inc_file
, new_backing_file
):
1344 command
= 'qemu-img rebase -u -b ' + new_backing_file
+ ' ' + inc_file
1345 self
.logger
.debug("command: " + command
)
1346 (_
, _
, stderr
) = self
.ssh_conn
.exec_command(command
)
1347 content
= stderr
.read()
1348 if len(content
) == 0:
1351 self
.logger
.error("qemu_change_backing error: " + content
)
1354 def get_notused_filename(self
, proposed_name
, suffix
=''):
1355 '''Look for a non existing file_name in the host
1356 proposed_name: proposed file name, includes path
1357 suffix: suffix to be added to the name, before the extention
1359 extension
= proposed_name
.rfind(".")
1360 slash
= proposed_name
.rfind("/")
1361 if extension
< 0 or extension
< slash
: # no extension
1362 extension
= len(proposed_name
)
1363 target_name
= proposed_name
[:extension
] + suffix
+ proposed_name
[extension
:]
1364 info
= self
.get_file_info(target_name
)
1369 while info
is not None:
1370 target_name
= proposed_name
[:extension
] + suffix
+ "-" + str(index
) + proposed_name
[extension
:]
1372 info
= self
.get_file_info(target_name
)
1375 def get_notused_path(self
, proposed_path
, suffix
=''):
1376 '''Look for a non existing path at database for images
1377 proposed_path: proposed file name, includes path
1378 suffix: suffix to be added to the name, before the extention
1380 extension
= proposed_path
.rfind(".")
1382 extension
= len(proposed_path
)
1384 target_path
= proposed_path
[:extension
] + suffix
+ proposed_path
[extension
:]
1387 r
,_
=self
.db
.get_table(FROM
="images",WHERE
={"path":target_path
})
1390 target_path
= proposed_path
[:extension
] + suffix
+ "-" + str(index
) + proposed_path
[extension
:]
1394 def delete_file(self
, file_name
):
1395 command
= 'rm -f '+file_name
1396 self
.logger
.debug("command: " + command
)
1397 (_
, _
, stderr
) = self
.ssh_conn
.exec_command(command
)
1398 error_msg
= stderr
.read()
1399 if len(error_msg
) > 0:
1400 raise paramiko
.ssh_exception
.SSHException("Error deleting file: " + error_msg
)
1402 def copy_file(self
, source
, destination
, perserve_time
=True):
1403 if source
[0:4]=="http":
1404 command
= "wget --no-verbose -O '{dst}' '{src}' 2>'{dst_result}' || cat '{dst_result}' >&2 && rm '{dst_result}'".format(
1405 dst
=destination
, src
=source
, dst_result
=destination
+ ".result" )
1407 command
= 'cp --no-preserve=mode'
1409 command
+= ' --preserve=timestamps'
1410 command
+= " '{}' '{}'".format(source
, destination
)
1411 self
.logger
.debug("command: " + command
)
1412 (_
, _
, stderr
) = self
.ssh_conn
.exec_command(command
)
1413 error_msg
= stderr
.read()
1414 if len(error_msg
) > 0:
1415 raise paramiko
.ssh_exception
.SSHException("Error copying image to local host: " + error_msg
)
1417 def copy_remote_file(self
, remote_file
, use_incremental
):
1418 ''' Copy a file from the repository to local folder and recursively
1419 copy the backing files in case the remote file is incremental
1420 Read and/or modified self.localinfo['files'] that contain the
1421 unmodified copies of images in the local path
1423 remote_file: path of remote file
1424 use_incremental: None (leave the decision to this function), True, False
1426 local_file: name of local file
1427 qemu_info: dict with quemu information of local file
1428 use_incremental_out: True, False; same as use_incremental, but if None a decision is taken
1431 use_incremental_out
= use_incremental
1432 new_backing_file
= None
1434 file_from_local
= True
1436 #in case incremental use is not decided, take the decision depending on the image
1437 #avoid the use of incremental if this image is already incremental
1438 if remote_file
[0:4] == "http":
1439 file_from_local
= False
1441 qemu_remote_info
= self
.qemu_get_info(remote_file
)
1442 if use_incremental_out
==None:
1443 use_incremental_out
= not ( file_from_local
and 'backing file' in qemu_remote_info
)
1444 #copy recursivelly the backing files
1445 if file_from_local
and 'backing file' in qemu_remote_info
:
1446 new_backing_file
, _
, _
= self
.copy_remote_file(qemu_remote_info
['backing file'], True)
1448 #check if remote file is present locally
1449 if use_incremental_out
and remote_file
in self
.localinfo
['files']:
1450 local_file
= self
.localinfo
['files'][remote_file
]
1451 local_file_info
= self
.get_file_info(local_file
)
1453 remote_file_info
= self
.get_file_info(remote_file
)
1454 if local_file_info
== None:
1456 elif file_from_local
and (local_file_info
[4]!=remote_file_info
[4] or local_file_info
[5]!=remote_file_info
[5]):
1457 #local copy of file not valid because date or size are different.
1458 #TODO DELETE local file if this file is not used by any active virtual machine
1460 self
.delete_file(local_file
)
1461 del self
.localinfo
['files'][remote_file
]
1465 else: #check that the local file has the same backing file, or there are not backing at all
1466 qemu_info
= self
.qemu_get_info(local_file
)
1467 if new_backing_file
!= qemu_info
.get('backing file'):
1471 if local_file
== None: #copy the file
1472 img_name
= remote_file
.split('/') [-1]
1473 img_local
= self
.image_path
+ '/' + img_name
1474 local_file
= self
.get_notused_filename(img_local
)
1475 self
.copy_file(remote_file
, local_file
, use_incremental_out
)
1477 if use_incremental_out
:
1478 self
.localinfo
['files'][remote_file
] = local_file
1479 if new_backing_file
:
1480 self
.qemu_change_backing(local_file
, new_backing_file
)
1481 qemu_info
= self
.qemu_get_info(local_file
)
1483 return local_file
, qemu_info
, use_incremental_out
1485 def launch_server(self
, conn
, server
, rebuild
=False, domain
=None):
1487 time
.sleep(random
.randint(20,150)) #sleep random timeto be make it a bit more real
1490 server_id
= server
['uuid']
1491 paused
= server
.get('paused','no')
1493 if domain
!=None and rebuild
==False:
1495 #self.server_status[server_id] = 'ACTIVE'
1498 self
.db_lock
.acquire()
1499 result
, server_data
= self
.db
.get_instance(server_id
)
1500 self
.db_lock
.release()
1502 self
.logger
.error("launch_server ERROR getting server from DB %d %s", result
, server_data
)
1503 return result
, server_data
1505 #0: get image metadata
1506 server_metadata
= server
.get('metadata', {})
1507 use_incremental
= None
1509 if "use_incremental" in server_metadata
:
1510 use_incremental
= False if server_metadata
["use_incremental"] == "no" else True
1512 server_host_files
= self
.localinfo
['server_files'].get( server
['uuid'], {})
1514 #delete previous incremental files
1515 for file_
in server_host_files
.values():
1516 self
.delete_file(file_
['source file'] )
1517 server_host_files
={}
1519 #1: obtain aditional devices (disks)
1520 #Put as first device the main disk
1521 devices
= [ {"type":"disk", "image_id":server
['image_id'], "vpci":server_metadata
.get('vpci', None) } ]
1522 if 'extended' in server_data
and server_data
['extended']!=None and "devices" in server_data
['extended']:
1523 devices
+= server_data
['extended']['devices']
1526 if dev
['image_id'] == None:
1529 self
.db_lock
.acquire()
1530 result
, content
= self
.db
.get_table(FROM
='images', SELECT
=('path', 'metadata'),
1531 WHERE
={'uuid': dev
['image_id']})
1532 self
.db_lock
.release()
1534 error_text
= "ERROR", result
, content
, "when getting image", dev
['image_id']
1535 self
.logger
.error("launch_server " + error_text
)
1536 return -1, error_text
1537 if content
[0]['metadata'] is not None:
1538 dev
['metadata'] = json
.loads(content
[0]['metadata'])
1540 dev
['metadata'] = {}
1542 if dev
['image_id'] in server_host_files
:
1543 dev
['source file'] = server_host_files
[ dev
['image_id'] ] ['source file'] #local path
1544 dev
['file format'] = server_host_files
[ dev
['image_id'] ] ['file format'] # raw or qcow2
1547 #2: copy image to host
1548 remote_file
= content
[0]['path']
1549 use_incremental_image
= use_incremental
1550 if dev
['metadata'].get("use_incremental") == "no":
1551 use_incremental_image
= False
1552 local_file
, qemu_info
, use_incremental_image
= self
.copy_remote_file(remote_file
, use_incremental_image
)
1554 #create incremental image
1555 if use_incremental_image
:
1556 local_file_inc
= self
.get_notused_filename(local_file
, '.inc')
1557 command
= 'qemu-img create -f qcow2 '+local_file_inc
+ ' -o backing_file='+ local_file
1558 self
.logger
.debug("command: " + command
)
1559 (_
, _
, stderr
) = self
.ssh_conn
.exec_command(command
)
1560 error_msg
= stderr
.read()
1561 if len(error_msg
) > 0:
1562 raise paramiko
.ssh_exception
.SSHException("Error creating incremental file: " + error_msg
)
1563 local_file
= local_file_inc
1564 qemu_info
= {'file format':'qcow2'}
1566 server_host_files
[ dev
['image_id'] ] = {'source file': local_file
, 'file format': qemu_info
['file format']}
1568 dev
['source file'] = local_file
1569 dev
['file format'] = qemu_info
['file format']
1571 self
.localinfo
['server_files'][ server
['uuid'] ] = server_host_files
1572 self
.localinfo_dirty
= True
1575 result
, xml
= self
.create_xml_server(server_data
, devices
, server_metadata
) #local_file
1577 self
.logger
.error("create xml server error: " + xml
)
1579 self
.logger
.debug("create xml: " + xml
)
1580 atribute
= host_thread
.lvirt_module
.VIR_DOMAIN_START_PAUSED
if paused
== "yes" else 0
1582 if not rebuild
: #ensures that any pending destroying server is done
1583 self
.server_forceoff(True)
1584 #self.logger.debug("launching instance " + xml)
1585 conn
.createXML(xml
, atribute
)
1586 #self.server_status[server_id] = 'PAUSED' if paused == "yes" else 'ACTIVE'
1590 except paramiko
.ssh_exception
.SSHException
as e
:
1592 self
.logger
.error("launch_server id='%s' ssh Exception: %s", server_id
, text
)
1593 if "SSH session not active" in text
:
1595 except host_thread
.lvirt_module
.libvirtError
as e
:
1596 text
= e
.get_error_message()
1597 self
.logger
.error("launch_server id='%s' libvirt Exception: %s", server_id
, text
)
1598 except Exception as e
:
1600 self
.logger
.error("launch_server id='%s' Exception: %s", server_id
, text
)
1603 def update_servers_status(self
):
1605 # VIR_DOMAIN_NOSTATE = 0
1606 # VIR_DOMAIN_RUNNING = 1
1607 # VIR_DOMAIN_BLOCKED = 2
1608 # VIR_DOMAIN_PAUSED = 3
1609 # VIR_DOMAIN_SHUTDOWN = 4
1610 # VIR_DOMAIN_SHUTOFF = 5
1611 # VIR_DOMAIN_CRASHED = 6
1612 # VIR_DOMAIN_PMSUSPENDED = 7 #TODO suspended
1614 if self
.test
or len(self
.server_status
)==0:
1618 conn
= host_thread
.lvirt_module
.open("qemu+ssh://"+self
.user
+"@"+self
.host
+"/system")
1619 domains
= conn
.listAllDomains()
1621 for domain
in domains
:
1622 uuid
= domain
.UUIDString() ;
1623 libvirt_status
= domain
.state()
1624 #print libvirt_status
1625 if libvirt_status
[0] == host_thread
.lvirt_module
.VIR_DOMAIN_RUNNING
or libvirt_status
[0] == host_thread
.lvirt_module
.VIR_DOMAIN_SHUTDOWN
:
1626 new_status
= "ACTIVE"
1627 elif libvirt_status
[0] == host_thread
.lvirt_module
.VIR_DOMAIN_PAUSED
:
1628 new_status
= "PAUSED"
1629 elif libvirt_status
[0] == host_thread
.lvirt_module
.VIR_DOMAIN_SHUTOFF
:
1630 new_status
= "INACTIVE"
1631 elif libvirt_status
[0] == host_thread
.lvirt_module
.VIR_DOMAIN_CRASHED
:
1632 new_status
= "ERROR"
1635 domain_dict
[uuid
] = new_status
1637 except host_thread
.lvirt_module
.libvirtError
as e
:
1638 self
.logger
.error("get_state() Exception " + e
.get_error_message())
1641 for server_id
, current_status
in self
.server_status
.iteritems():
1643 if server_id
in domain_dict
:
1644 new_status
= domain_dict
[server_id
]
1646 new_status
= "INACTIVE"
1648 if new_status
== None or new_status
== current_status
:
1650 if new_status
== 'INACTIVE' and current_status
== 'ERROR':
1651 continue #keep ERROR status, because obviously this machine is not running
1653 self
.logger
.debug("server id='%s' status change from '%s' to '%s'", server_id
, current_status
, new_status
)
1654 STATUS
={'progress':100, 'status':new_status
}
1655 if new_status
== 'ERROR':
1656 STATUS
['last_error'] = 'machine has crashed'
1657 self
.db_lock
.acquire()
1658 r
,_
= self
.db
.update_rows('instances', STATUS
, {'uuid':server_id
}, log
=False)
1659 self
.db_lock
.release()
1661 self
.server_status
[server_id
] = new_status
1663 def action_on_server(self
, req
, last_retry
=True):
1664 '''Perform an action on a req
1666 req: dictionary that contain:
1667 server properties: 'uuid','name','tenant_id','status'
1669 host properties: 'user', 'ip_name'
1670 return (error, text)
1671 0: No error. VM is updated to new state,
1672 -1: Invalid action, as trying to pause a PAUSED VM
1673 -2: Error accessing host
1675 -4: Error at DB access
1676 -5: Error while trying to perform action. VM is updated to ERROR
1678 server_id
= req
['uuid']
1681 old_status
= req
['status']
1685 if 'terminate' in req
['action']:
1686 new_status
= 'deleted'
1687 elif 'shutoff' in req
['action'] or 'shutdown' in req
['action'] or 'forceOff' in req
['action']:
1688 if req
['status']!='ERROR':
1690 new_status
= 'INACTIVE'
1691 elif 'start' in req
['action'] and req
['status']!='ERROR':
1692 new_status
= 'ACTIVE'
1693 elif 'resume' in req
['action'] and req
['status']!='ERROR' and req
['status']!='INACTIVE':
1694 new_status
= 'ACTIVE'
1695 elif 'pause' in req
['action'] and req
['status']!='ERROR':
1696 new_status
= 'PAUSED'
1697 elif 'reboot' in req
['action'] and req
['status']!='ERROR':
1698 new_status
= 'ACTIVE'
1699 elif 'rebuild' in req
['action']:
1700 time
.sleep(random
.randint(20,150))
1701 new_status
= 'ACTIVE'
1702 elif 'createImage' in req
['action']:
1704 self
.create_image(None, req
)
1707 conn
= host_thread
.lvirt_module
.open("qemu+ssh://"+self
.user
+"@"+self
.host
+"/system")
1709 dom
= conn
.lookupByUUIDString(server_id
)
1710 except host_thread
.lvirt_module
.libvirtError
as e
:
1711 text
= e
.get_error_message()
1712 if 'LookupByUUIDString' in text
or 'Domain not found' in text
or 'No existe un dominio coincidente' in text
:
1715 self
.logger
.error("action_on_server id='%s' libvirt exception: %s", server_id
, text
)
1718 if 'forceOff' in req
['action']:
1720 self
.logger
.debug("action_on_server id='%s' domain not running", server_id
)
1723 self
.logger
.debug("sending DESTROY to server id='%s'", server_id
)
1725 except Exception as e
:
1726 if "domain is not running" not in e
.get_error_message():
1727 self
.logger
.error("action_on_server id='%s' Exception while sending force off: %s",
1728 server_id
, e
.get_error_message())
1729 last_error
= 'action_on_server Exception while destroy: ' + e
.get_error_message()
1730 new_status
= 'ERROR'
1732 elif 'terminate' in req
['action']:
1734 self
.logger
.debug("action_on_server id='%s' domain not running", server_id
)
1735 new_status
= 'deleted'
1738 if req
['action']['terminate'] == 'force':
1739 self
.logger
.debug("sending DESTROY to server id='%s'", server_id
)
1741 new_status
= 'deleted'
1743 self
.logger
.debug("sending SHUTDOWN to server id='%s'", server_id
)
1745 self
.pending_terminate_server
.append( (time
.time()+10,server_id
) )
1746 except Exception as e
:
1747 self
.logger
.error("action_on_server id='%s' Exception while destroy: %s",
1748 server_id
, e
.get_error_message())
1749 last_error
= 'action_on_server Exception while destroy: ' + e
.get_error_message()
1750 new_status
= 'ERROR'
1751 if "domain is not running" in e
.get_error_message():
1754 new_status
= 'deleted'
1756 self
.logger
.error("action_on_server id='%s' Exception while undefine: %s",
1757 server_id
, e
.get_error_message())
1758 last_error
= 'action_on_server Exception2 while undefine:', e
.get_error_message()
1759 #Exception: 'virDomainDetachDevice() failed'
1760 if new_status
=='deleted':
1761 if server_id
in self
.server_status
:
1762 del self
.server_status
[server_id
]
1763 if req
['uuid'] in self
.localinfo
['server_files']:
1764 for file_
in self
.localinfo
['server_files'][ req
['uuid'] ].values():
1766 self
.delete_file(file_
['source file'])
1769 del self
.localinfo
['server_files'][ req
['uuid'] ]
1770 self
.localinfo_dirty
= True
1772 elif 'shutoff' in req
['action'] or 'shutdown' in req
['action']:
1775 self
.logger
.debug("action_on_server id='%s' domain not running", server_id
)
1778 # new_status = 'INACTIVE'
1779 #TODO: check status for changing at database
1780 except Exception as e
:
1781 new_status
= 'ERROR'
1782 self
.logger
.error("action_on_server id='%s' Exception while shutdown: %s",
1783 server_id
, e
.get_error_message())
1784 last_error
= 'action_on_server Exception while shutdown: ' + e
.get_error_message()
1786 elif 'rebuild' in req
['action']:
1789 r
= self
.launch_server(conn
, req
, True, None)
1791 new_status
= 'ERROR'
1794 new_status
= 'ACTIVE'
1795 elif 'start' in req
['action']:
1796 # The instance is only create in DB but not yet at libvirt domain, needs to be create
1797 rebuild
= True if req
['action']['start'] == 'rebuild' else False
1798 r
= self
.launch_server(conn
, req
, rebuild
, dom
)
1800 new_status
= 'ERROR'
1803 new_status
= 'ACTIVE'
1805 elif 'resume' in req
['action']:
1811 # new_status = 'ACTIVE'
1812 except Exception as e
:
1813 self
.logger
.error("action_on_server id='%s' Exception while resume: %s",
1814 server_id
, e
.get_error_message())
1816 elif 'pause' in req
['action']:
1822 # new_status = 'PAUSED'
1823 except Exception as e
:
1824 self
.logger
.error("action_on_server id='%s' Exception while pause: %s",
1825 server_id
, e
.get_error_message())
1827 elif 'reboot' in req
['action']:
1833 self
.logger
.debug("action_on_server id='%s' reboot:", server_id
)
1834 #new_status = 'ACTIVE'
1835 except Exception as e
:
1836 self
.logger
.error("action_on_server id='%s' Exception while reboot: %s",
1837 server_id
, e
.get_error_message())
1838 elif 'createImage' in req
['action']:
1839 self
.create_image(dom
, req
)
1843 except host_thread
.lvirt_module
.libvirtError
as e
:
1844 if conn
is not None: conn
.close()
1845 text
= e
.get_error_message()
1846 new_status
= "ERROR"
1848 if 'LookupByUUIDString' in text
or 'Domain not found' in text
or 'No existe un dominio coincidente' in text
:
1849 self
.logger
.debug("action_on_server id='%s' Exception removed from host", server_id
)
1851 self
.logger
.error("action_on_server id='%s' Exception %s", server_id
, text
)
1852 #end of if self.test
1853 if new_status
== None:
1856 self
.logger
.debug("action_on_server id='%s' new status=%s %s",server_id
, new_status
, last_error
)
1857 UPDATE
= {'progress':100, 'status':new_status
}
1859 if new_status
=='ERROR':
1860 if not last_retry
: #if there will be another retry do not update database
1862 elif 'terminate' in req
['action']:
1863 #PUT a log in the database
1864 self
.logger
.error("PANIC deleting server id='%s' %s", server_id
, last_error
)
1865 self
.db_lock
.acquire()
1866 self
.db
.new_row('logs',
1867 {'uuid':server_id
, 'tenant_id':req
['tenant_id'], 'related':'instances','level':'panic',
1868 'description':'PANIC deleting server from host '+self
.name
+': '+last_error
}
1870 self
.db_lock
.release()
1871 if server_id
in self
.server_status
:
1872 del self
.server_status
[server_id
]
1875 UPDATE
['last_error'] = last_error
1876 if new_status
!= 'deleted' and (new_status
!= old_status
or new_status
== 'ERROR') :
1877 self
.db_lock
.acquire()
1878 self
.db
.update_rows('instances', UPDATE
, {'uuid':server_id
}, log
=True)
1879 self
.server_status
[server_id
] = new_status
1880 self
.db_lock
.release()
1881 if new_status
== 'ERROR':
1886 def restore_iface(self
, name
, mac
, lib_conn
=None):
1887 ''' make an ifdown, ifup to restore default parameter of na interface
1889 mac: mac address of the interface
1890 lib_conn: connection to the libvirt, if None a new connection is created
1891 Return 0,None if ok, -1,text if fails
1897 self
.logger
.debug("restore_iface '%s' %s", name
, mac
)
1901 conn
= host_thread
.lvirt_module
.open("qemu+ssh://"+self
.user
+"@"+self
.host
+"/system")
1905 #wait to the pending VM deletion
1906 #TODO.Revise self.server_forceoff(True)
1908 iface
= conn
.interfaceLookupByMACString(mac
)
1909 if iface
.isActive():
1912 self
.logger
.debug("restore_iface '%s' %s", name
, mac
)
1913 except host_thread
.lvirt_module
.libvirtError
as e
:
1914 error_text
= e
.get_error_message()
1915 self
.logger
.error("restore_iface '%s' '%s' libvirt exception: %s", name
, mac
, error_text
)
1918 if lib_conn
is None and conn
is not None:
1920 return ret
, error_text
1923 def create_image(self
,dom
, req
):
1925 if 'path' in req
['action']['createImage']:
1926 file_dst
= req
['action']['createImage']['path']
1928 createImage
=req
['action']['createImage']
1929 img_name
= createImage
['source']['path']
1930 index
=img_name
.rfind('/')
1931 file_dst
= self
.get_notused_path(img_name
[:index
+1] + createImage
['name'] + '.qcow2')
1932 image_status
='ACTIVE'
1936 server_id
= req
['uuid']
1937 createImage
=req
['action']['createImage']
1938 file_orig
= self
.localinfo
['server_files'][server_id
] [ createImage
['source']['image_id'] ] ['source file']
1939 if 'path' in req
['action']['createImage']:
1940 file_dst
= req
['action']['createImage']['path']
1942 img_name
= createImage
['source']['path']
1943 index
=img_name
.rfind('/')
1944 file_dst
= self
.get_notused_filename(img_name
[:index
+1] + createImage
['name'] + '.qcow2')
1946 self
.copy_file(file_orig
, file_dst
)
1947 qemu_info
= self
.qemu_get_info(file_orig
)
1948 if 'backing file' in qemu_info
:
1949 for k
,v
in self
.localinfo
['files'].items():
1950 if v
==qemu_info
['backing file']:
1951 self
.qemu_change_backing(file_dst
, k
)
1953 image_status
='ACTIVE'
1955 except paramiko
.ssh_exception
.SSHException
as e
:
1956 image_status
='ERROR'
1957 error_text
= e
.args
[0]
1958 self
.logger
.error("create_image id='%s' ssh Exception: %s", server_id
, error_text
)
1959 if "SSH session not active" in error_text
and retry
==0:
1961 except Exception as e
:
1962 image_status
='ERROR'
1964 self
.logger
.error("create_image id='%s' Exception: %s", server_id
, error_text
)
1966 #TODO insert a last_error at database
1967 self
.db_lock
.acquire()
1968 self
.db
.update_rows('images', {'status':image_status
, 'progress': 100, 'path':file_dst
},
1969 {'uuid':req
['new_image']['uuid']}, log
=True)
1970 self
.db_lock
.release()
1972 def edit_iface(self
, port_id
, old_net
, new_net
):
1973 #This action imply remove and insert interface to put proper parameters
1978 self
.db_lock
.acquire()
1979 r
,c
= self
.db
.get_table(FROM
='ports as p join resources_port as rp on p.uuid=rp.port_id',
1980 WHERE
={'port_id': port_id
})
1981 self
.db_lock
.release()
1983 self
.logger
.error("edit_iface %s DDBB error: %s", port_id
, c
)
1986 self
.logger
.error("edit_iface %s port not found", port_id
)
1989 if port
["model"]!="VF":
1990 self
.logger
.error("edit_iface %s ERROR model must be VF", port_id
)
1992 #create xml detach file
1995 xml
.append("<interface type='hostdev' managed='yes'>")
1996 xml
.append(" <mac address='" +port
['mac']+ "'/>")
1997 xml
.append(" <source>"+ self
.pci2xml(port
['pci'])+"\n </source>")
1998 xml
.append('</interface>')
2003 conn
= host_thread
.lvirt_module
.open("qemu+ssh://"+self
.user
+"@"+self
.host
+"/system")
2004 dom
= conn
.lookupByUUIDString(port
["instance_id"])
2007 self
.logger
.debug("edit_iface detaching SRIOV interface " + text
)
2008 dom
.detachDeviceFlags(text
, flags
=host_thread
.lvirt_module
.VIR_DOMAIN_AFFECT_LIVE
)
2010 xml
[-1] =" <vlan> <tag id='" + str(port
['vlan']) + "'/> </vlan>"
2012 xml
.append(self
.pci2xml(port
.get('vpci',None)) )
2013 xml
.append('</interface>')
2015 self
.logger
.debug("edit_iface attaching SRIOV interface " + text
)
2016 dom
.attachDeviceFlags(text
, flags
=host_thread
.lvirt_module
.VIR_DOMAIN_AFFECT_LIVE
)
2018 except host_thread
.lvirt_module
.libvirtError
as e
:
2019 text
= e
.get_error_message()
2020 self
.logger
.error("edit_iface %s libvirt exception: %s", port
["instance_id"], text
)
2023 if conn
is not None: conn
.close()
2026 def create_server(server
, db
, db_lock
, only_of_ports
):
2027 extended
= server
.get('extended', None)
2029 requirements
['numa']={'memory':0, 'proc_req_type': 'threads', 'proc_req_nb':0, 'port_list':[], 'sriov_list':[]}
2030 requirements
['ram'] = server
['flavor'].get('ram', 0)
2031 if requirements
['ram']== None:
2032 requirements
['ram'] = 0
2033 requirements
['vcpus'] = server
['flavor'].get('vcpus', 0)
2034 if requirements
['vcpus']== None:
2035 requirements
['vcpus'] = 0
2036 #If extended is not defined get requirements from flavor
2037 if extended
is None:
2038 #If extended is defined in flavor convert to dictionary and use it
2039 if 'extended' in server
['flavor'] and server
['flavor']['extended'] != None:
2040 json_acceptable_string
= server
['flavor']['extended'].replace("'", "\"")
2041 extended
= json
.loads(json_acceptable_string
)
2044 #print json.dumps(extended, indent=4)
2046 #For simplicity only one numa VM are supported in the initial implementation
2047 if extended
!= None:
2048 numas
= extended
.get('numas', [])
2050 return (-2, "Multi-NUMA VMs are not supported yet")
2052 # return (-1, "At least one numa must be specified")
2054 #a for loop is used in order to be ready to multi-NUMA VMs
2058 numa_req
['memory'] = numa
.get('memory', 0)
2060 numa_req
['proc_req_nb'] = numa
['cores'] #number of cores or threads to be reserved
2061 numa_req
['proc_req_type'] = 'cores' #indicates whether cores or threads must be reserved
2062 numa_req
['proc_req_list'] = numa
.get('cores-id', None) #list of ids to be assigned to the cores or threads
2063 elif 'paired-threads' in numa
:
2064 numa_req
['proc_req_nb'] = numa
['paired-threads']
2065 numa_req
['proc_req_type'] = 'paired-threads'
2066 numa_req
['proc_req_list'] = numa
.get('paired-threads-id', None)
2067 elif 'threads' in numa
:
2068 numa_req
['proc_req_nb'] = numa
['threads']
2069 numa_req
['proc_req_type'] = 'threads'
2070 numa_req
['proc_req_list'] = numa
.get('threads-id', None)
2072 numa_req
['proc_req_nb'] = 0 # by default
2073 numa_req
['proc_req_type'] = 'threads'
2077 #Generate a list of sriov and another for physical interfaces
2078 interfaces
= numa
.get('interfaces', [])
2081 for iface
in interfaces
:
2082 iface
['bandwidth'] = int(iface
['bandwidth'])
2083 if iface
['dedicated'][:3]=='yes':
2084 port_list
.append(iface
)
2086 sriov_list
.append(iface
)
2088 #Save lists ordered from more restrictive to less bw requirements
2089 numa_req
['sriov_list'] = sorted(sriov_list
, key
=lambda k
: k
['bandwidth'], reverse
=True)
2090 numa_req
['port_list'] = sorted(port_list
, key
=lambda k
: k
['bandwidth'], reverse
=True)
2093 request
.append(numa_req
)
2095 # print "----------\n"+json.dumps(request[0], indent=4)
2096 # print '----------\n\n'
2098 #Search in db for an appropriate numa for each requested numa
2099 #at the moment multi-NUMA VMs are not supported
2101 requirements
['numa'].update(request
[0])
2102 if requirements
['numa']['memory']>0:
2103 requirements
['ram']=0 #By the moment I make incompatible ask for both Huge and non huge pages memory
2104 elif requirements
['ram']==0:
2105 return (-1, "Memory information not set neither at extended field not at ram")
2106 if requirements
['numa']['proc_req_nb']>0:
2107 requirements
['vcpus']=0 #By the moment I make incompatible ask for both Isolated and non isolated cpus
2108 elif requirements
['vcpus']==0:
2109 return (-1, "Processor information not set neither at extended field not at vcpus")
2113 result
, content
= db
.get_numas(requirements
, server
.get('host_id', None), only_of_ports
)
2117 return (-1, content
)
2119 numa_id
= content
['numa_id']
2120 host_id
= content
['host_id']
2122 #obtain threads_id and calculate pinning
2125 if requirements
['numa']['proc_req_nb']>0:
2127 result
, content
= db
.get_table(FROM
='resources_core',
2128 SELECT
=('id','core_id','thread_id'),
2129 WHERE
={'numa_id':numa_id
,'instance_id': None, 'status':'ok'} )
2135 #convert rows to a dictionary indexed by core_id
2138 if not row
['core_id'] in cores_dict
:
2139 cores_dict
[row
['core_id']] = []
2140 cores_dict
[row
['core_id']].append([row
['thread_id'],row
['id']])
2142 #In case full cores are requested
2144 if requirements
['numa']['proc_req_type'] == 'cores':
2145 #Get/create the list of the vcpu_ids
2146 vcpu_id_list
= requirements
['numa']['proc_req_list']
2147 if vcpu_id_list
== None:
2148 vcpu_id_list
= range(0,int(requirements
['numa']['proc_req_nb']))
2150 for threads
in cores_dict
.itervalues():
2152 if len(threads
) != 2:
2155 #set pinning for the first thread
2156 cpu_pinning
.append( [ vcpu_id_list
.pop(0), threads
[0][0], threads
[0][1] ] )
2158 #reserve so it is not used the second thread
2159 reserved_threads
.append(threads
[1][1])
2161 if len(vcpu_id_list
) == 0:
2164 #In case paired threads are requested
2165 elif requirements
['numa']['proc_req_type'] == 'paired-threads':
2167 #Get/create the list of the vcpu_ids
2168 if requirements
['numa']['proc_req_list'] != None:
2170 for pair
in requirements
['numa']['proc_req_list']:
2172 return -1, "Field paired-threads-id not properly specified"
2174 vcpu_id_list
.append(pair
[0])
2175 vcpu_id_list
.append(pair
[1])
2177 vcpu_id_list
= range(0,2*int(requirements
['numa']['proc_req_nb']))
2179 for threads
in cores_dict
.itervalues():
2181 if len(threads
) != 2:
2183 #set pinning for the first thread
2184 cpu_pinning
.append([vcpu_id_list
.pop(0), threads
[0][0], threads
[0][1]])
2186 #set pinning for the second thread
2187 cpu_pinning
.append([vcpu_id_list
.pop(0), threads
[1][0], threads
[1][1]])
2189 if len(vcpu_id_list
) == 0:
2192 #In case normal threads are requested
2193 elif requirements
['numa']['proc_req_type'] == 'threads':
2194 #Get/create the list of the vcpu_ids
2195 vcpu_id_list
= requirements
['numa']['proc_req_list']
2196 if vcpu_id_list
== None:
2197 vcpu_id_list
= range(0,int(requirements
['numa']['proc_req_nb']))
2199 for threads_index
in sorted(cores_dict
, key
=lambda k
: len(cores_dict
[k
])):
2200 threads
= cores_dict
[threads_index
]
2201 #set pinning for the first thread
2202 cpu_pinning
.append([vcpu_id_list
.pop(0), threads
[0][0], threads
[0][1]])
2204 #if exists, set pinning for the second thread
2205 if len(threads
) == 2 and len(vcpu_id_list
) != 0:
2206 cpu_pinning
.append([vcpu_id_list
.pop(0), threads
[1][0], threads
[1][1]])
2208 if len(vcpu_id_list
) == 0:
2211 #Get the source pci addresses for the selected numa
2212 used_sriov_ports
= []
2213 for port
in requirements
['numa']['sriov_list']:
2215 result
, content
= db
.get_table(FROM
='resources_port', SELECT
=('id', 'pci', 'mac'),WHERE
={'numa_id':numa_id
,'root_id': port
['port_id'], 'port_id': None, 'Mbps_used': 0} )
2221 if row
['id'] in used_sriov_ports
or row
['id']==port
['port_id']:
2223 port
['pci'] = row
['pci']
2224 if 'mac_address' not in port
:
2225 port
['mac_address'] = row
['mac']
2227 port
['port_id']=row
['id']
2228 port
['Mbps_used'] = port
['bandwidth']
2229 used_sriov_ports
.append(row
['id'])
2232 for port
in requirements
['numa']['port_list']:
2233 port
['Mbps_used'] = None
2234 if port
['dedicated'] != "yes:sriov":
2235 port
['mac_address'] = port
['mac']
2239 result
, content
= db
.get_table(FROM
='resources_port', SELECT
=('id', 'pci', 'mac', 'Mbps'),WHERE
={'numa_id':numa_id
,'root_id': port
['port_id'], 'port_id': None, 'Mbps_used': 0} )
2244 port
['Mbps_used'] = content
[0]['Mbps']
2246 if row
['id'] in used_sriov_ports
or row
['id']==port
['port_id']:
2248 port
['pci'] = row
['pci']
2249 if 'mac_address' not in port
:
2250 port
['mac_address'] = row
['mac'] # mac cannot be set to passthrough ports
2252 port
['port_id']=row
['id']
2253 used_sriov_ports
.append(row
['id'])
2256 # print '2. Physical ports assignation:'+json.dumps(requirements['port_list'], indent=4)
2257 # print '2. SR-IOV assignation:'+json.dumps(requirements['sriov_list'], indent=4)
2259 server
['host_id'] = host_id
2261 #Generate dictionary for saving in db the instance resources
2263 resources
['bridged-ifaces'] = []
2266 numa_dict
['interfaces'] = []
2268 numa_dict
['interfaces'] += requirements
['numa']['port_list']
2269 numa_dict
['interfaces'] += requirements
['numa']['sriov_list']
2271 #Check bridge information
2272 unified_dataplane_iface
=[]
2273 unified_dataplane_iface
+= requirements
['numa']['port_list']
2274 unified_dataplane_iface
+= requirements
['numa']['sriov_list']
2276 for control_iface
in server
.get('networks', []):
2277 control_iface
['net_id']=control_iface
.pop('uuid')
2278 #Get the brifge name
2280 result
, content
= db
.get_table(FROM
='nets',
2281 SELECT
=('name', 'type', 'vlan', 'provider', 'enable_dhcp',
2282 'dhcp_first_ip', 'dhcp_last_ip', 'cidr'),
2283 WHERE
={'uuid': control_iface
['net_id']})
2288 return -1, "Error at field netwoks: Not found any network wit uuid %s" % control_iface
['net_id']
2291 if control_iface
.get("type", 'virtual') == 'virtual':
2292 if network
['type']!='bridge_data' and network
['type']!='bridge_man':
2293 return -1, "Error at field netwoks: network uuid %s for control interface is not of type bridge_man or bridge_data" % control_iface
['net_id']
2294 resources
['bridged-ifaces'].append(control_iface
)
2295 if network
.get("provider") and network
["provider"][0:3] == "OVS":
2296 control_iface
["type"] = "instance:ovs"
2298 control_iface
["type"] = "instance:bridge"
2299 if network
.get("vlan"):
2300 control_iface
["vlan"] = network
["vlan"]
2302 if network
.get("enable_dhcp") == 'true':
2303 control_iface
["enable_dhcp"] = network
.get("enable_dhcp")
2304 control_iface
["dhcp_first_ip"] = network
["dhcp_first_ip"]
2305 control_iface
["dhcp_last_ip"] = network
["dhcp_last_ip"]
2306 control_iface
["cidr"] = network
["cidr"]
2308 if network
['type']!='data' and network
['type']!='ptp':
2309 return -1, "Error at field netwoks: network uuid %s for dataplane interface is not of type data or ptp" % control_iface
['net_id']
2310 #dataplane interface, look for it in the numa tree and asign this network
2312 for dataplane_iface
in numa_dict
['interfaces']:
2313 if dataplane_iface
['name'] == control_iface
.get("name"):
2314 if (dataplane_iface
['dedicated'] == "yes" and control_iface
["type"] != "PF") or \
2315 (dataplane_iface
['dedicated'] == "no" and control_iface
["type"] != "VF") or \
2316 (dataplane_iface
['dedicated'] == "yes:sriov" and control_iface
["type"] != "VFnotShared") :
2317 return -1, "Error at field netwoks: mismatch at interface '%s' from flavor 'dedicated=%s' and networks 'type=%s'" % \
2318 (control_iface
.get("name"), dataplane_iface
['dedicated'], control_iface
["type"])
2319 dataplane_iface
['uuid'] = control_iface
['net_id']
2320 if dataplane_iface
['dedicated'] == "no":
2321 dataplane_iface
['vlan'] = network
['vlan']
2322 if dataplane_iface
['dedicated'] != "yes" and control_iface
.get("mac_address"):
2323 dataplane_iface
['mac_address'] = control_iface
.get("mac_address")
2324 if control_iface
.get("vpci"):
2325 dataplane_iface
['vpci'] = control_iface
.get("vpci")
2329 return -1, "Error at field netwoks: interface name %s from network not found at flavor" % control_iface
.get("name")
2331 resources
['host_id'] = host_id
2332 resources
['image_id'] = server
['image_id']
2333 resources
['flavor_id'] = server
['flavor_id']
2334 resources
['tenant_id'] = server
['tenant_id']
2335 resources
['ram'] = requirements
['ram']
2336 resources
['vcpus'] = requirements
['vcpus']
2337 resources
['status'] = 'CREATING'
2339 if 'description' in server
: resources
['description'] = server
['description']
2340 if 'name' in server
: resources
['name'] = server
['name']
2342 resources
['extended'] = {} #optional
2343 resources
['extended']['numas'] = []
2344 numa_dict
['numa_id'] = numa_id
2345 numa_dict
['memory'] = requirements
['numa']['memory']
2346 numa_dict
['cores'] = []
2348 for core
in cpu_pinning
:
2349 numa_dict
['cores'].append({'id': core
[2], 'vthread': core
[0], 'paired': paired
})
2350 for core
in reserved_threads
:
2351 numa_dict
['cores'].append({'id': core
})
2352 resources
['extended']['numas'].append(numa_dict
)
2353 if extended
!=None and 'devices' in extended
: #TODO allow extra devices without numa
2354 resources
['extended']['devices'] = extended
['devices']
2357 # '===================================={'
2358 #print json.dumps(resources, indent=4)
2359 #print '====================================}'