1 # -*- coding: utf-8 -*-
4 # Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
5 # This file is part of openvim
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
12 # http://www.apache.org/licenses/LICENSE-2.0
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact with: nfvlabs@tid.es
25 This is thread that interact with the host and the libvirt to manage VM
26 One thread will be launched per host
28 __author__
= "Pablo Montes, Alfonso Tierno, Leonardo Mirabal"
29 __date__
= "$10-jul-2014 12:07:15$"
43 from jsonschema
import validate
as js_v
, exceptions
as js_e
44 from vim_schema
import localinfo_schema
, hostinfo_schema
47 class host_thread(threading
.Thread
):
50 def __init__(self
, name
, host
, user
, db
, db_lock
, test
, image_path
, host_id
, version
, develop_mode
,
51 develop_bridge_iface
, password
=None, keyfile
= None, logger_name
=None, debug
=None):
56 'host','user': host ip or name to manage and user
57 'db', 'db_lock': database class and lock to use it in exclusion
59 threading
.Thread
.__init
__(self
)
64 self
.db_lock
= db_lock
66 self
.password
= password
67 self
.keyfile
= keyfile
68 self
.localinfo_dirty
= False
70 if not test
and not host_thread
.lvirt_module
:
72 module_info
= imp
.find_module("libvirt")
73 host_thread
.lvirt_module
= imp
.load_module("libvirt", *module_info
)
74 except (IOError, ImportError) as e
:
75 raise ImportError("Cannot import python-libvirt. Openvim not properly installed" +str(e
))
77 self
.logger_name
= logger_name
79 self
.logger_name
= "openvim.host."+name
80 self
.logger
= logging
.getLogger(self
.logger_name
)
82 self
.logger
.setLevel(getattr(logging
, debug
))
85 self
.develop_mode
= develop_mode
86 self
.develop_bridge_iface
= develop_bridge_iface
87 self
.image_path
= image_path
88 self
.empty_image_path
= image_path
89 self
.host_id
= host_id
90 self
.version
= version
95 self
.server_status
= {} #dictionary with pairs server_uuid:server_status
96 self
.pending_terminate_server
=[] #list with pairs (time,server_uuid) time to send a terminate for a server being destroyed
97 self
.next_update_server_status
= 0 #time when must be check servers status
101 self
.queueLock
= threading
.Lock()
102 self
.taskQueue
= Queue
.Queue(2000)
104 self
.lvirt_conn_uri
= "qemu+ssh://{user}@{host}/system?no_tty=1&no_verify=1".format(
105 user
=self
.user
, host
=self
.host
)
107 self
.lvirt_conn_uri
+= "&keyfile=" + keyfile
109 def ssh_connect(self
):
112 self
.ssh_conn
= paramiko
.SSHClient()
113 self
.ssh_conn
.set_missing_host_key_policy(paramiko
.AutoAddPolicy())
114 self
.ssh_conn
.load_system_host_keys()
115 self
.ssh_conn
.connect(self
.host
, username
=self
.user
, password
=self
.password
, key_filename
=self
.keyfile
,
117 except paramiko
.ssh_exception
.SSHException
as e
:
119 self
.logger
.error("ssh_connect ssh Exception: " + text
)
121 def load_localinfo(self
):
127 command
= 'mkdir -p ' + self
.image_path
128 # print self.name, ': command:', command
129 (_
, stdout
, stderr
) = self
.ssh_conn
.exec_command(command
)
130 content
= stderr
.read()
132 self
.logger
.error("command: '%s' stderr: '%s'", command
, content
)
134 command
= 'cat ' + self
.image_path
+ '/.openvim.yaml'
135 # print self.name, ': command:', command
136 (_
, stdout
, stderr
) = self
.ssh_conn
.exec_command(command
)
137 content
= stdout
.read()
138 if len(content
) == 0:
139 self
.logger
.error("command: '%s' stderr='%s'", command
, stderr
.read())
140 raise paramiko
.ssh_exception
.SSHException("Error empty file, command: '{}'".format(command
))
141 self
.localinfo
= yaml
.load(content
)
142 js_v(self
.localinfo
, localinfo_schema
)
143 self
.localinfo_dirty
= False
144 if 'server_files' not in self
.localinfo
:
145 self
.localinfo
['server_files'] = {}
146 self
.logger
.debug("localinfo load from host")
149 except paramiko
.ssh_exception
.SSHException
as e
:
151 self
.logger
.error("load_localinfo ssh Exception: " + text
)
152 except host_thread
.lvirt_module
.libvirtError
as e
:
153 text
= e
.get_error_message()
154 self
.logger
.error("load_localinfo libvirt Exception: " + text
)
155 except yaml
.YAMLError
as exc
:
157 if hasattr(exc
, 'problem_mark'):
158 mark
= exc
.problem_mark
159 text
= " at position: (%s:%s)" % (mark
.line
+1, mark
.column
+1)
160 self
.logger
.error("load_localinfo yaml format Exception " + text
)
161 except js_e
.ValidationError
as e
:
163 if len(e
.path
)>0: text
=" at '" + ":".join(map(str, e
.path
))+"'"
164 self
.logger
.error("load_localinfo format Exception: %s %s", text
, str(e
))
165 except Exception as e
:
167 self
.logger
.error("load_localinfo Exception: " + text
)
169 #not loaded, insert a default data and force saving by activating dirty flag
170 self
.localinfo
= {'files':{}, 'server_files':{} }
171 #self.localinfo_dirty=True
172 self
.localinfo_dirty
=False
174 def load_hostinfo(self
):
182 command
= 'cat ' + self
.image_path
+ '/hostinfo.yaml'
183 #print self.name, ': command:', command
184 (_
, stdout
, stderr
) = self
.ssh_conn
.exec_command(command
)
185 content
= stdout
.read()
186 if len(content
) == 0:
187 self
.logger
.error("command: '%s' stderr: '%s'", command
, stderr
.read())
188 raise paramiko
.ssh_exception
.SSHException("Error empty file ")
189 self
.hostinfo
= yaml
.load(content
)
190 js_v(self
.hostinfo
, hostinfo_schema
)
191 self
.logger
.debug("hostlinfo load from host " + str(self
.hostinfo
))
194 except paramiko
.ssh_exception
.SSHException
as e
:
196 self
.logger
.error("load_hostinfo ssh Exception: " + text
)
197 except host_thread
.lvirt_module
.libvirtError
as e
:
198 text
= e
.get_error_message()
199 self
.logger
.error("load_hostinfo libvirt Exception: " + text
)
200 except yaml
.YAMLError
as exc
:
202 if hasattr(exc
, 'problem_mark'):
203 mark
= exc
.problem_mark
204 text
= " at position: (%s:%s)" % (mark
.line
+1, mark
.column
+1)
205 self
.logger
.error("load_hostinfo yaml format Exception " + text
)
206 except js_e
.ValidationError
as e
:
208 if len(e
.path
)>0: text
=" at '" + ":".join(map(str, e
.path
))+"'"
209 self
.logger
.error("load_hostinfo format Exception: %s %s", text
, e
.message
)
210 except Exception as e
:
212 self
.logger
.error("load_hostinfo Exception: " + text
)
214 #not loaded, insert a default data
217 def save_localinfo(self
, tries
=3):
219 self
.localinfo_dirty
= False
226 command
= 'cat > ' + self
.image_path
+ '/.openvim.yaml'
227 self
.logger
.debug("command:" + command
)
228 (stdin
, _
, _
) = self
.ssh_conn
.exec_command(command
)
229 yaml
.safe_dump(self
.localinfo
, stdin
, explicit_start
=True, indent
=4, default_flow_style
=False, tags
=False, encoding
='utf-8', allow_unicode
=True)
231 self
.localinfo_dirty
= False
234 except paramiko
.ssh_exception
.SSHException
as e
:
236 self
.logger
.error("save_localinfo ssh Exception: " + text
)
237 if "SSH session not active" in text
:
239 except host_thread
.lvirt_module
.libvirtError
as e
:
240 text
= e
.get_error_message()
241 self
.logger
.error("save_localinfo libvirt Exception: " + text
)
242 except yaml
.YAMLError
as exc
:
244 if hasattr(exc
, 'problem_mark'):
245 mark
= exc
.problem_mark
246 text
= " at position: (%s:%s)" % (mark
.line
+1, mark
.column
+1)
247 self
.logger
.error("save_localinfo yaml format Exception " + text
)
248 except Exception as e
:
250 self
.logger
.error("save_localinfo Exception: " + text
)
252 def load_servers_from_db(self
):
253 self
.db_lock
.acquire()
254 r
,c
= self
.db
.get_table(SELECT
=('uuid','status', 'image_id'), FROM
='instances', WHERE
={'host_id': self
.host_id
})
255 self
.db_lock
.release()
257 self
.server_status
= {}
259 self
.logger
.error("Error getting data from database: " + c
)
262 self
.server_status
[ server
['uuid'] ] = server
['status']
264 #convert from old version to new one
265 if 'inc_files' in self
.localinfo
and server
['uuid'] in self
.localinfo
['inc_files']:
266 server_files_dict
= {'source file': self
.localinfo
['inc_files'][ server
['uuid'] ] [0], 'file format':'raw' }
267 if server_files_dict
['source file'][-5:] == 'qcow2':
268 server_files_dict
['file format'] = 'qcow2'
270 self
.localinfo
['server_files'][ server
['uuid'] ] = { server
['image_id'] : server_files_dict
}
271 if 'inc_files' in self
.localinfo
:
272 del self
.localinfo
['inc_files']
273 self
.localinfo_dirty
= True
275 def delete_unused_files(self
):
276 '''Compares self.localinfo['server_files'] content with real servers running self.server_status obtained from database
277 Deletes unused entries at self.loacalinfo and the corresponding local files.
278 The only reason for this mismatch is the manual deletion of instances (VM) at database
282 for uuid
,images
in self
.localinfo
['server_files'].items():
283 if uuid
not in self
.server_status
:
284 for localfile
in images
.values():
286 self
.logger
.debug("deleting file '%s' of unused server '%s'", localfile
['source file'], uuid
)
287 self
.delete_file(localfile
['source file'])
288 except paramiko
.ssh_exception
.SSHException
as e
:
289 self
.logger
.error("Exception deleting file '%s': %s", localfile
['source file'], str(e
))
290 del self
.localinfo
['server_files'][uuid
]
291 self
.localinfo_dirty
= True
293 def insert_task(self
, task
, *aditional
):
295 self
.queueLock
.acquire()
296 task
= self
.taskQueue
.put( (task
,) + aditional
, timeout
=5)
297 self
.queueLock
.release()
300 return -1, "timeout inserting a task over host " + self
.name
304 self
.load_localinfo()
306 self
.load_servers_from_db()
307 self
.delete_unused_files()
310 self
.queueLock
.acquire()
311 if not self
.taskQueue
.empty():
312 task
= self
.taskQueue
.get()
315 self
.queueLock
.release()
319 if self
.localinfo_dirty
:
320 self
.save_localinfo()
321 elif self
.next_update_server_status
< now
:
322 self
.update_servers_status()
323 self
.next_update_server_status
= now
+ 5
324 elif len(self
.pending_terminate_server
)>0 and self
.pending_terminate_server
[0][0]<now
:
325 self
.server_forceoff()
330 if task
[0] == 'instance':
331 self
.logger
.debug("processing task instance " + str(task
[1]['action']))
335 r
= self
.action_on_server(task
[1], retry
==2)
338 elif task
[0] == 'image':
340 elif task
[0] == 'exit':
341 self
.logger
.debug("processing task exit")
344 elif task
[0] == 'reload':
345 self
.logger
.debug("processing task reload terminating and relaunching")
348 elif task
[0] == 'edit-iface':
349 self
.logger
.debug("processing task edit-iface port={}, old_net={}, new_net={}".format(
350 task
[1], task
[2], task
[3]))
351 self
.edit_iface(task
[1], task
[2], task
[3])
352 elif task
[0] == 'restore-iface':
353 self
.logger
.debug("processing task restore-iface={} mac={}".format(task
[1], task
[2]))
354 self
.restore_iface(task
[1], task
[2])
355 elif task
[0] == 'new-ovsbridge':
356 self
.logger
.debug("Creating compute OVS bridge")
357 self
.create_ovs_bridge()
358 elif task
[0] == 'new-vxlan':
359 self
.logger
.debug("Creating vxlan tunnel='{}', remote ip='{}'".format(task
[1], task
[2]))
360 self
.create_ovs_vxlan_tunnel(task
[1], task
[2])
361 elif task
[0] == 'del-ovsbridge':
362 self
.logger
.debug("Deleting OVS bridge")
363 self
.delete_ovs_bridge()
364 elif task
[0] == 'del-vxlan':
365 self
.logger
.debug("Deleting vxlan {} tunnel".format(task
[1]))
366 self
.delete_ovs_vxlan_tunnel(task
[1])
367 elif task
[0] == 'create-ovs-bridge-port':
368 self
.logger
.debug("Adding port ovim-{} to OVS bridge".format(task
[1]))
369 self
.create_ovs_bridge_port(task
[1])
370 elif task
[0] == 'del-ovs-port':
371 self
.logger
.debug("Delete bridge attached to ovs port vlan {} net {}".format(task
[1], task
[2]))
372 self
.delete_bridge_port_attached_to_ovs(task
[1], task
[2])
374 self
.logger
.debug("unknown task " + str(task
))
376 except Exception as e
:
377 self
.logger
.critical("Unexpected exception at run: " + str(e
), exc_info
=True)
379 def server_forceoff(self
, wait_until_finished
=False):
380 while len(self
.pending_terminate_server
)>0:
382 if self
.pending_terminate_server
[0][0]>now
:
383 if wait_until_finished
:
388 req
={'uuid':self
.pending_terminate_server
[0][1],
389 'action':{'terminate':'force'},
392 self
.action_on_server(req
)
393 self
.pending_terminate_server
.pop(0)
397 self
.server_forceoff(True)
398 if self
.localinfo_dirty
:
399 self
.save_localinfo()
401 self
.ssh_conn
.close()
402 except Exception as e
:
404 self
.logger
.error("terminate Exception: " + text
)
405 self
.logger
.debug("exit from host_thread")
407 def get_local_iface_name(self
, generic_name
):
408 if self
.hostinfo
!= None and "iface_names" in self
.hostinfo
and generic_name
in self
.hostinfo
["iface_names"]:
409 return self
.hostinfo
["iface_names"][generic_name
]
412 def create_xml_server(self
, server
, dev_list
, server_metadata
={}):
413 """Function that implements the generation of the VM XML definition.
414 Additional devices are in dev_list list
415 The main disk is upon dev_list[0]"""
417 #get if operating system is Windows
419 os_type
= server_metadata
.get('os_type', None)
420 if os_type
== None and 'metadata' in dev_list
[0]:
421 os_type
= dev_list
[0]['metadata'].get('os_type', None)
422 if os_type
!= None and os_type
.lower() == "windows":
424 #get type of hard disk bus
425 bus_ide
= True if windows_os
else False
426 bus
= server_metadata
.get('bus', None)
427 if bus
== None and 'metadata' in dev_list
[0]:
428 bus
= dev_list
[0]['metadata'].get('bus', None)
430 bus_ide
= True if bus
=='ide' else False
434 text
= "<domain type='kvm'>"
436 topo
= server_metadata
.get('topology', None)
437 if topo
== None and 'metadata' in dev_list
[0]:
438 topo
= dev_list
[0]['metadata'].get('topology', None)
440 name
= server
.get('name', '')[:28] + "_" + server
['uuid'][:28] #qemu impose a length limit of 59 chars or not start. Using 58
441 text
+= self
.inc_tab() + "<name>" + name
+ "</name>"
443 text
+= self
.tab() + "<uuid>" + server
['uuid'] + "</uuid>"
446 if 'extended' in server
and server
['extended']!=None and 'numas' in server
['extended']:
447 numa
= server
['extended']['numas'][0]
450 memory
= int(numa
.get('memory',0))*1024*1024 #in KiB
452 memory
= int(server
['ram'])*1024;
454 if not self
.develop_mode
:
457 return -1, 'No memory assigned to instance'
459 text
+= self
.tab() + "<memory unit='KiB'>" +memory
+"</memory>"
460 text
+= self
.tab() + "<currentMemory unit='KiB'>" +memory
+ "</currentMemory>"
462 text
+= self
.tab()+'<memoryBacking>'+ \
463 self
.inc_tab() + '<hugepages/>'+ \
464 self
.dec_tab()+ '</memoryBacking>'
467 use_cpu_pinning
=False
468 vcpus
= int(server
.get("vcpus",0))
470 if 'cores-source' in numa
:
472 for index
in range(0, len(numa
['cores-source'])):
473 cpu_pinning
.append( [ numa
['cores-id'][index
], numa
['cores-source'][index
] ] )
475 if 'threads-source' in numa
:
477 for index
in range(0, len(numa
['threads-source'])):
478 cpu_pinning
.append( [ numa
['threads-id'][index
], numa
['threads-source'][index
] ] )
480 if 'paired-threads-source' in numa
:
482 for index
in range(0, len(numa
['paired-threads-source'])):
483 cpu_pinning
.append( [numa
['paired-threads-id'][index
][0], numa
['paired-threads-source'][index
][0] ] )
484 cpu_pinning
.append( [numa
['paired-threads-id'][index
][1], numa
['paired-threads-source'][index
][1] ] )
487 if use_cpu_pinning
and not self
.develop_mode
:
488 text
+= self
.tab()+"<vcpu placement='static'>" +str(len(cpu_pinning
)) +"</vcpu>" + \
489 self
.tab()+'<cputune>'
491 for i
in range(0, len(cpu_pinning
)):
492 text
+= self
.tab() + "<vcpupin vcpu='" +str(cpu_pinning
[i
][0])+ "' cpuset='" +str(cpu_pinning
[i
][1]) +"'/>"
493 text
+= self
.dec_tab()+'</cputune>'+ \
494 self
.tab() + '<numatune>' +\
495 self
.inc_tab() + "<memory mode='strict' nodeset='" +str(numa
['source'])+ "'/>" +\
496 self
.dec_tab() + '</numatune>'
499 return -1, "Instance without number of cpus"
500 text
+= self
.tab()+"<vcpu>" + str(vcpus
) + "</vcpu>"
505 if dev
['type']=='cdrom' :
508 text
+= self
.tab()+ '<os>' + \
509 self
.inc_tab() + "<type arch='x86_64' machine='pc'>hvm</type>"
511 text
+= self
.tab() + "<boot dev='cdrom'/>"
512 text
+= self
.tab() + "<boot dev='hd'/>" + \
513 self
.dec_tab()+'</os>'
515 text
+= self
.tab()+'<features>'+\
516 self
.inc_tab()+'<acpi/>' +\
517 self
.tab()+'<apic/>' +\
518 self
.tab()+'<pae/>'+ \
519 self
.dec_tab() +'</features>'
520 if topo
== "oneSocket:hyperthreading":
522 return -1, 'Cannot expose hyperthreading with an odd number of vcpus'
523 text
+= self
.tab() + "<cpu mode='host-model'> <topology sockets='1' cores='%d' threads='2' /> </cpu>" % (vcpus
/2)
524 elif windows_os
or topo
== "oneSocket":
525 text
+= self
.tab() + "<cpu mode='host-model'> <topology sockets='1' cores='%d' threads='1' /> </cpu>" % vcpus
527 text
+= self
.tab() + "<cpu mode='host-model'></cpu>"
528 text
+= self
.tab() + "<clock offset='utc'/>" +\
529 self
.tab() + "<on_poweroff>preserve</on_poweroff>" + \
530 self
.tab() + "<on_reboot>restart</on_reboot>" + \
531 self
.tab() + "<on_crash>restart</on_crash>"
532 text
+= self
.tab() + "<devices>" + \
533 self
.inc_tab() + "<emulator>/usr/libexec/qemu-kvm</emulator>" + \
534 self
.tab() + "<serial type='pty'>" +\
535 self
.inc_tab() + "<target port='0'/>" + \
536 self
.dec_tab() + "</serial>" +\
537 self
.tab() + "<console type='pty'>" + \
538 self
.inc_tab()+ "<target type='serial' port='0'/>" + \
539 self
.dec_tab()+'</console>'
541 text
+= self
.tab() + "<controller type='usb' index='0'/>" + \
542 self
.tab() + "<controller type='ide' index='0'/>" + \
543 self
.tab() + "<input type='mouse' bus='ps2'/>" + \
544 self
.tab() + "<sound model='ich6'/>" + \
545 self
.tab() + "<video>" + \
546 self
.inc_tab() + "<model type='cirrus' vram='9216' heads='1'/>" + \
547 self
.dec_tab() + "</video>" + \
548 self
.tab() + "<memballoon model='virtio'/>" + \
549 self
.tab() + "<input type='tablet' bus='usb'/>" #TODO revisar
551 #> self.tab()+'<alias name=\'hostdev0\'/>\n' +\
552 #> self.dec_tab()+'</hostdev>\n' +\
553 #> self.tab()+'<input type=\'tablet\' bus=\'usb\'/>\n'
555 text
+= self
.tab() + "<graphics type='vnc' port='-1' autoport='yes'/>"
557 #If image contains 'GRAPH' include graphics
558 #if 'GRAPH' in image:
559 text
+= self
.tab() + "<graphics type='vnc' port='-1' autoport='yes' listen='0.0.0.0'>" +\
560 self
.inc_tab() + "<listen type='address' address='0.0.0.0'/>" +\
561 self
.dec_tab() + "</graphics>"
565 bus_ide_dev
= bus_ide
566 if dev
['type']=='cdrom' or dev
['type']=='disk':
567 if dev
['type']=='cdrom':
569 text
+= self
.tab() + "<disk type='file' device='"+dev
['type']+"'>"
570 if 'file format' in dev
:
571 text
+= self
.inc_tab() + "<driver name='qemu' type='" +dev
['file format']+ "' cache='writethrough'/>"
572 if 'source file' in dev
:
573 text
+= self
.tab() + "<source file='" +dev
['source file']+ "'/>"
574 #elif v['type'] == 'block':
575 # text += self.tab() + "<source dev='" + v['source'] + "'/>"
577 # return -1, 'Unknown disk type ' + v['type']
578 vpci
= dev
.get('vpci',None)
579 if vpci
== None and 'metadata' in dev
:
580 vpci
= dev
['metadata'].get('vpci',None)
581 text
+= self
.pci2xml(vpci
)
584 text
+= self
.tab() + "<target dev='hd" +vd_index
+ "' bus='ide'/>" #TODO allows several type of disks
586 text
+= self
.tab() + "<target dev='vd" +vd_index
+ "' bus='virtio'/>"
587 text
+= self
.dec_tab() + '</disk>'
588 vd_index
= chr(ord(vd_index
)+1)
589 elif dev
['type']=='xml':
590 dev_text
= dev
['xml']
592 dev_text
= dev_text
.replace('__vpci__', dev
['vpci'])
593 if 'source file' in dev
:
594 dev_text
= dev_text
.replace('__file__', dev
['source file'])
595 if 'file format' in dev
:
596 dev_text
= dev_text
.replace('__format__', dev
['source file'])
597 if '__dev__' in dev_text
:
598 dev_text
= dev_text
.replace('__dev__', vd_index
)
599 vd_index
= chr(ord(vd_index
)+1)
602 return -1, 'Unknown device type ' + dev
['type']
605 bridge_interfaces
= server
.get('networks', [])
606 for v
in bridge_interfaces
:
608 self
.db_lock
.acquire()
609 result
, content
= self
.db
.get_table(FROM
='nets', SELECT
=('provider',),WHERE
={'uuid':v
['net_id']} )
610 self
.db_lock
.release()
612 self
.logger
.error("create_xml_server ERROR %d getting nets %s", result
, content
)
614 #ALF: Allow by the moment the 'default' bridge net because is confortable for provide internet to VM
615 #I know it is not secure
616 #for v in sorted(desc['network interfaces'].itervalues()):
617 model
= v
.get("model", None)
618 if content
[0]['provider']=='default':
619 text
+= self
.tab() + "<interface type='network'>" + \
620 self
.inc_tab() + "<source network='" +content
[0]['provider']+ "'/>"
621 elif content
[0]['provider'][0:7]=='macvtap':
622 text
+= self
.tab()+"<interface type='direct'>" + \
623 self
.inc_tab() + "<source dev='" + self
.get_local_iface_name(content
[0]['provider'][8:]) + "' mode='bridge'/>" + \
624 self
.tab() + "<target dev='macvtap0'/>"
626 text
+= self
.tab() + "<alias name='net" + str(net_nb
) + "'/>"
629 elif content
[0]['provider'][0:6]=='bridge':
630 text
+= self
.tab() + "<interface type='bridge'>" + \
631 self
.inc_tab()+"<source bridge='" +self
.get_local_iface_name(content
[0]['provider'][7:])+ "'/>"
633 text
+= self
.tab() + "<target dev='vnet" + str(net_nb
)+ "'/>" +\
634 self
.tab() + "<alias name='net" + str(net_nb
)+ "'/>"
637 elif content
[0]['provider'][0:3] == "OVS":
638 vlan
= content
[0]['provider'].replace('OVS:', '')
639 text
+= self
.tab() + "<interface type='bridge'>" + \
640 self
.inc_tab() + "<source bridge='ovim-" + str(vlan
) + "'/>"
642 return -1, 'Unknown Bridge net provider ' + content
[0]['provider']
644 text
+= self
.tab() + "<model type='" +model
+ "'/>"
645 if v
.get('mac_address', None) != None:
646 text
+= self
.tab() +"<mac address='" +v
['mac_address']+ "'/>"
647 text
+= self
.pci2xml(v
.get('vpci',None))
648 text
+= self
.dec_tab()+'</interface>'
652 interfaces
= numa
.get('interfaces', [])
656 if self
.develop_mode
: #map these interfaces to bridges
657 text
+= self
.tab() + "<interface type='bridge'>" + \
658 self
.inc_tab()+"<source bridge='" +self
.develop_bridge_iface
+ "'/>"
660 text
+= self
.tab() + "<target dev='vnet" + str(net_nb
)+ "'/>" +\
661 self
.tab() + "<alias name='net" + str(net_nb
)+ "'/>"
663 text
+= self
.tab() + "<model type='e1000'/>" #e1000 is more probable to be supported than 'virtio'
664 if v
.get('mac_address', None) != None:
665 text
+= self
.tab() +"<mac address='" +v
['mac_address']+ "'/>"
666 text
+= self
.pci2xml(v
.get('vpci',None))
667 text
+= self
.dec_tab()+'</interface>'
670 if v
['dedicated'] == 'yes': #passthrought
671 text
+= self
.tab() + "<hostdev mode='subsystem' type='pci' managed='yes'>" + \
672 self
.inc_tab() + "<source>"
674 text
+= self
.pci2xml(v
['source'])
675 text
+= self
.dec_tab()+'</source>'
676 text
+= self
.pci2xml(v
.get('vpci',None))
678 text
+= self
.tab() + "<alias name='hostdev" + str(net_nb
) + "'/>"
679 text
+= self
.dec_tab()+'</hostdev>'
681 else: #sriov_interfaces
682 #skip not connected interfaces
683 if v
.get("net_id") == None:
685 text
+= self
.tab() + "<interface type='hostdev' managed='yes'>"
687 if v
.get('mac_address', None) != None:
688 text
+= self
.tab() + "<mac address='" +v
['mac_address']+ "'/>"
689 text
+= self
.tab()+'<source>'
691 text
+= self
.pci2xml(v
['source'])
692 text
+= self
.dec_tab()+'</source>'
693 if v
.get('vlan',None) != None:
694 text
+= self
.tab() + "<vlan> <tag id='" + str(v
['vlan']) + "'/> </vlan>"
695 text
+= self
.pci2xml(v
.get('vpci',None))
697 text
+= self
.tab() + "<alias name='hostdev" + str(net_nb
) + "'/>"
698 text
+= self
.dec_tab()+'</interface>'
701 text
+= self
.dec_tab()+'</devices>'+\
702 self
.dec_tab()+'</domain>'
705 def pci2xml(self
, pci
):
706 '''from a pci format text XXXX:XX:XX.X generates the xml content of <address>
707 alows an empty pci text'''
710 first_part
= pci
.split(':')
711 second_part
= first_part
[2].split('.')
712 return self
.tab() + "<address type='pci' domain='0x" + first_part
[0] + \
713 "' bus='0x" + first_part
[1] + "' slot='0x" + second_part
[0] + \
714 "' function='0x" + second_part
[1] + "'/>"
717 """Return indentation according to xml_level"""
718 return "\n" + (' '*self
.xml_level
)
721 """Increment and return indentation according to xml_level"""
726 """Decrement and return indentation according to xml_level"""
730 def create_ovs_bridge(self
):
732 Create a bridge in compute OVS to allocate VMs
733 :return: True if success
738 command
= 'sudo ovs-vsctl --may-exist add-br br-int -- set Bridge br-int stp_enable=true'
739 self
.logger
.debug("command: " + command
)
740 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
741 content
= stdout
.read()
742 if len(content
) == 0:
746 except paramiko
.ssh_exception
.SSHException
as e
:
747 self
.logger
.error("create_ovs_bridge ssh Exception: " + str(e
))
748 if "SSH session not active" in str(e
):
752 def delete_port_to_ovs_bridge(self
, vlan
, net_uuid
):
754 Delete linux bridge port attched to a OVS bridge, if port is not free the port is not removed
755 :param vlan: vlan port id
756 :param net_uuid: network id
763 port_name
= 'ovim-' + str(vlan
)
764 command
= 'sudo ovs-vsctl del-port br-int ' + port_name
765 self
.logger
.debug("command: " + command
)
766 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
767 content
= stdout
.read()
768 if len(content
) == 0:
772 except paramiko
.ssh_exception
.SSHException
as e
:
773 self
.logger
.error("delete_port_to_ovs_bridge ssh Exception: " + str(e
))
774 if "SSH session not active" in str(e
):
778 def delete_dhcp_server(self
, vlan
, net_uuid
, dhcp_path
):
780 Delete dhcp server process lining in namespace
781 :param vlan: segmentation id
782 :param net_uuid: network uuid
783 :param dhcp_path: conf fiel path that live in namespace side
788 if not self
.is_dhcp_port_free(vlan
, net_uuid
):
791 net_namespace
= 'ovim-' + str(vlan
)
792 dhcp_path
= os
.path
.join(dhcp_path
, net_namespace
)
793 pid_file
= os
.path
.join(dhcp_path
, 'dnsmasq.pid')
795 command
= 'sudo ip netns exec ' + net_namespace
+ ' cat ' + pid_file
796 self
.logger
.debug("command: " + command
)
797 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
798 content
= stdout
.read()
800 command
= 'sudo ip netns exec ' + net_namespace
+ ' kill -9 ' + content
801 self
.logger
.debug("command: " + command
)
802 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
803 content
= stdout
.read()
805 # if len(content) == 0:
809 except paramiko
.ssh_exception
.SSHException
as e
:
810 self
.logger
.error("delete_dhcp_server ssh Exception: " + str(e
))
811 if "SSH session not active" in str(e
):
815 def is_dhcp_port_free(self
, host_id
, net_uuid
):
817 Check if any port attached to the a net in a vxlan mesh across computes nodes
818 :param host_id: host id
819 :param net_uuid: network id
820 :return: True if is not free
822 self
.db_lock
.acquire()
823 result
, content
= self
.db
.get_table(
825 WHERE
={'type': 'instance:ovs', 'net_id': net_uuid
}
827 self
.db_lock
.release()
834 def is_port_free(self
, host_id
, net_uuid
):
836 Check if there not ovs ports of a network in a compute host.
837 :param host_id: host id
838 :param net_uuid: network id
839 :return: True if is not free
842 self
.db_lock
.acquire()
843 result
, content
= self
.db
.get_table(
844 FROM
='ports as p join instances as i on p.instance_id=i.uuid',
845 WHERE
={"i.host_id": self
.host_id
, 'p.type': 'instance:ovs', 'p.net_id': net_uuid
}
847 self
.db_lock
.release()
854 def add_port_to_ovs_bridge(self
, vlan
):
856 Add a bridge linux as a port to a OVS bridge and set a vlan for an specific linux bridge
857 :param vlan: vlan port id
858 :return: True if success
864 port_name
= 'ovim-' + str(vlan
)
865 command
= 'sudo ovs-vsctl add-port br-int ' + port_name
+ ' tag=' + str(vlan
)
866 self
.logger
.debug("command: " + command
)
867 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
868 content
= stdout
.read()
869 if len(content
) == 0:
873 except paramiko
.ssh_exception
.SSHException
as e
:
874 self
.logger
.error("add_port_to_ovs_bridge ssh Exception: " + str(e
))
875 if "SSH session not active" in str(e
):
879 def delete_dhcp_port(self
, vlan
, net_uuid
):
881 Delete from an existing OVS bridge a linux bridge port attached and the linux bridge itself.
882 :param vlan: segmentation id
883 :param net_uuid: network id
884 :return: True if success
890 if not self
.is_dhcp_port_free(vlan
, net_uuid
):
892 self
.delete_dhcp_interfaces(vlan
)
895 def delete_bridge_port_attached_to_ovs(self
, vlan
, net_uuid
):
897 Delete from an existing OVS bridge a linux bridge port attached and the linux bridge itself.
900 :return: True if success
905 if not self
.is_port_free(vlan
, net_uuid
):
907 self
.delete_port_to_ovs_bridge(vlan
, net_uuid
)
908 self
.delete_linux_bridge(vlan
)
911 def delete_linux_bridge(self
, vlan
):
913 Delete a linux bridge in a scpecific compute.
914 :param vlan: vlan port id
915 :return: True if success
921 port_name
= 'ovim-' + str(vlan
)
922 command
= 'sudo ip link set dev veth0-' + str(vlan
) + ' down'
923 self
.logger
.debug("command: " + command
)
924 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
925 # content = stdout.read()
927 # if len(content) != 0:
929 command
= 'sudo ifconfig ' + port_name
+ ' down && sudo brctl delbr ' + port_name
930 self
.logger
.debug("command: " + command
)
931 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
932 content
= stdout
.read()
933 if len(content
) == 0:
937 except paramiko
.ssh_exception
.SSHException
as e
:
938 self
.logger
.error("delete_linux_bridge ssh Exception: " + str(e
))
939 if "SSH session not active" in str(e
):
943 def create_ovs_bridge_port(self
, vlan
):
945 Generate a linux bridge and attache the port to a OVS bridge
946 :param vlan: vlan port id
951 self
.create_linux_bridge(vlan
)
952 self
.add_port_to_ovs_bridge(vlan
)
954 def create_linux_bridge(self
, vlan
):
956 Create a linux bridge with STP active
957 :param vlan: netowrk vlan id
964 port_name
= 'ovim-' + str(vlan
)
965 command
= 'sudo brctl show | grep ' + port_name
966 self
.logger
.debug("command: " + command
)
967 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
968 content
= stdout
.read()
970 # if exist nothing to create
971 # if len(content) == 0:
974 command
= 'sudo brctl addbr ' + port_name
975 self
.logger
.debug("command: " + command
)
976 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
977 content
= stdout
.read()
979 # if len(content) == 0:
984 command
= 'sudo brctl stp ' + port_name
+ ' on'
985 self
.logger
.debug("command: " + command
)
986 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
987 content
= stdout
.read()
989 # if len(content) == 0:
993 command
= 'sudo ip link set dev ' + port_name
+ ' up'
994 self
.logger
.debug("command: " + command
)
995 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
996 content
= stdout
.read()
998 if len(content
) == 0:
1002 except paramiko
.ssh_exception
.SSHException
as e
:
1003 self
.logger
.error("create_linux_bridge ssh Exception: " + str(e
))
1004 if "SSH session not active" in str(e
):
1008 def set_mac_dhcp_server(self
, ip
, mac
, vlan
, netmask
, dhcp_path
):
1010 Write into dhcp conf file a rule to assigned a fixed ip given to an specific MAC address
1011 :param ip: IP address asigned to a VM
1012 :param mac: VM vnic mac to be macthed with the IP received
1013 :param vlan: Segmentation id
1014 :param netmask: netmask value
1015 :param path: dhcp conf file path that live in namespace side
1016 :return: True if success
1022 net_namespace
= 'ovim-' + str(vlan
)
1023 dhcp_path
= os
.path
.join(dhcp_path
, net_namespace
)
1024 dhcp_hostsdir
= os
.path
.join(dhcp_path
, net_namespace
)
1029 ip_data
= mac
.upper() + ',' + ip
1031 command
= 'sudo ip netns exec ' + net_namespace
+ ' touch ' + dhcp_hostsdir
1032 self
.logger
.debug("command: " + command
)
1033 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
1034 content
= stdout
.read()
1036 command
= 'sudo ip netns exec ' + net_namespace
+ ' sudo bash -ec "echo ' + ip_data
+ ' >> ' + dhcp_hostsdir
+ '"'
1038 self
.logger
.debug("command: " + command
)
1039 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
1040 content
= stdout
.read()
1042 if len(content
) == 0:
1046 except paramiko
.ssh_exception
.SSHException
as e
:
1047 self
.logger
.error("set_mac_dhcp_server ssh Exception: " + str(e
))
1048 if "SSH session not active" in str(e
):
1052 def delete_mac_dhcp_server(self
, ip
, mac
, vlan
, dhcp_path
):
1054 Delete into dhcp conf file the ip assigned to a specific MAC address
1056 :param ip: IP address asigned to a VM
1057 :param mac: VM vnic mac to be macthed with the IP received
1058 :param vlan: Segmentation id
1059 :param dhcp_path: dhcp conf file path that live in namespace side
1066 net_namespace
= 'ovim-' + str(vlan
)
1067 dhcp_path
= os
.path
.join(dhcp_path
, net_namespace
)
1068 dhcp_hostsdir
= os
.path
.join(dhcp_path
, net_namespace
)
1073 ip_data
= mac
.upper() + ',' + ip
1075 command
= 'sudo ip netns exec ' + net_namespace
+ ' sudo sed -i \'/' + ip_data
+ '/d\' ' + dhcp_hostsdir
1076 self
.logger
.debug("command: " + command
)
1077 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
1078 content
= stdout
.read()
1080 if len(content
) == 0:
1085 except paramiko
.ssh_exception
.SSHException
as e
:
1086 self
.logger
.error("set_mac_dhcp_server ssh Exception: " + str(e
))
1087 if "SSH session not active" in str(e
):
1091 def launch_dhcp_server(self
, vlan
, ip_range
, netmask
, dhcp_path
, gateway
):
1093 Generate a linux bridge and attache the port to a OVS bridge
1095 :param vlan: Segmentation id
1096 :param ip_range: IP dhcp range
1097 :param netmask: network netmask
1098 :param dhcp_path: dhcp conf file path that live in namespace side
1099 :param gateway: Gateway address for dhcp net
1100 :return: True if success
1106 interface
= 'tap-' + str(vlan
)
1107 net_namespace
= 'ovim-' + str(vlan
)
1108 dhcp_path
= os
.path
.join(dhcp_path
, net_namespace
)
1109 leases_path
= os
.path
.join(dhcp_path
, "dnsmasq.leases")
1110 pid_file
= os
.path
.join(dhcp_path
, 'dnsmasq.pid')
1112 dhcp_range
= ip_range
[0] + ',' + ip_range
[1] + ',' + netmask
1114 command
= 'sudo ip netns exec ' + net_namespace
+ ' mkdir -p ' + dhcp_path
1115 self
.logger
.debug("command: " + command
)
1116 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
1117 content
= stdout
.read()
1119 pid_path
= os
.path
.join(dhcp_path
, 'dnsmasq.pid')
1120 command
= 'sudo ip netns exec ' + net_namespace
+ ' cat ' + pid_path
1121 self
.logger
.debug("command: " + command
)
1122 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
1123 content
= stdout
.read()
1124 # check if pid is runing
1125 pid_status_path
= content
1127 command
= "ps aux | awk '{print $2 }' | grep " + pid_status_path
1128 self
.logger
.debug("command: " + command
)
1129 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
1130 content
= stdout
.read()
1132 command
= 'sudo ip netns exec ' + net_namespace
+ ' /usr/sbin/dnsmasq --strict-order --except-interface=lo ' \
1133 '--interface=' + interface
+ ' --bind-interfaces --dhcp-hostsdir=' + dhcp_path
+ \
1134 ' --dhcp-range ' + dhcp_range
+ ' --pid-file=' + pid_file
+ ' --dhcp-leasefile=' + leases_path
+ \
1135 ' --listen-address ' + gateway
1137 self
.logger
.debug("command: " + command
)
1138 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
1139 content
= stdout
.readline()
1141 if len(content
) == 0:
1145 except paramiko
.ssh_exception
.SSHException
as e
:
1146 self
.logger
.error("launch_dhcp_server ssh Exception: " + str(e
))
1147 if "SSH session not active" in str(e
):
1151 def delete_dhcp_interfaces(self
, vlan
):
1153 Create a linux bridge with STP active
1154 :param vlan: netowrk vlan id
1161 net_namespace
= 'ovim-' + str(vlan
)
1162 command
= 'sudo ovs-vsctl del-port br-int ovs-tap-' + str(vlan
)
1163 self
.logger
.debug("command: " + command
)
1164 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
1165 content
= stdout
.read()
1167 command
= 'sudo ip netns exec ' + net_namespace
+ ' ip link set dev tap-' + str(vlan
) + ' down'
1168 self
.logger
.debug("command: " + command
)
1169 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
1170 content
= stdout
.read()
1172 command
= 'sudo ip link set dev ovs-tap-' + str(vlan
) + ' down'
1173 self
.logger
.debug("command: " + command
)
1174 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
1175 content
= stdout
.read()
1176 except paramiko
.ssh_exception
.SSHException
as e
:
1177 self
.logger
.error("delete_dhcp_interfaces ssh Exception: " + str(e
))
1178 if "SSH session not active" in str(e
):
1182 def create_dhcp_interfaces(self
, vlan
, ip_listen_address
, netmask
):
1184 Create a linux bridge with STP active
1185 :param vlan: segmentation id
1186 :param ip_listen_address: Listen Ip address for the dhcp service, the tap interface living in namesapce side
1187 :param netmask: dhcp net CIDR
1188 :return: True if success
1194 net_namespace
= 'ovim-' + str(vlan
)
1195 namespace_interface
= 'tap-' + str(vlan
)
1197 command
= 'sudo ip netns add ' + net_namespace
1198 self
.logger
.debug("command: " + command
)
1199 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
1200 content
= stdout
.read()
1202 command
= 'sudo ip link add tap-' + str(vlan
) + ' type veth peer name ovs-tap-' + str(vlan
)
1203 self
.logger
.debug("command: " + command
)
1204 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
1205 content
= stdout
.read()
1207 command
= 'sudo ovs-vsctl add-port br-int ovs-tap-' + str(vlan
) + ' tag=' + str(vlan
)
1208 self
.logger
.debug("command: " + command
)
1209 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
1210 content
= stdout
.read()
1212 command
= 'sudo ip link set tap-' + str(vlan
) + ' netns ' + net_namespace
1213 self
.logger
.debug("command: " + command
)
1214 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
1215 content
= stdout
.read()
1217 command
= 'sudo ip netns exec ' + net_namespace
+ ' ip link set dev tap-' + str(vlan
) + ' up'
1218 self
.logger
.debug("command: " + command
)
1219 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
1220 content
= stdout
.read()
1222 command
= 'sudo ip link set dev ovs-tap-' + str(vlan
) + ' up'
1223 self
.logger
.debug("command: " + command
)
1224 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
1225 content
= stdout
.read()
1227 command
= 'sudo ip netns exec ' + net_namespace
+ ' ip link set dev lo up'
1228 self
.logger
.debug("command: " + command
)
1229 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
1230 content
= stdout
.read()
1232 command
= 'sudo ip netns exec ' + net_namespace
+ ' ' + ' ifconfig ' + namespace_interface \
1233 + ' ' + ip_listen_address
+ ' netmask ' + netmask
1234 self
.logger
.debug("command: " + command
)
1235 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
1236 content
= stdout
.read()
1238 if len(content
) == 0:
1242 except paramiko
.ssh_exception
.SSHException
as e
:
1243 self
.logger
.error("create_dhcp_interfaces ssh Exception: " + str(e
))
1244 if "SSH session not active" in str(e
):
1249 def create_ovs_vxlan_tunnel(self
, vxlan_interface
, remote_ip
):
1251 Create a vlxn tunnel between to computes with an OVS installed. STP is also active at port level
1252 :param vxlan_interface: vlxan inteface name.
1253 :param remote_ip: tunnel endpoint remote compute ip.
1259 command
= 'sudo ovs-vsctl add-port br-int ' + vxlan_interface
+ \
1260 ' -- set Interface ' + vxlan_interface
+ ' type=vxlan options:remote_ip=' + remote_ip
+ \
1261 ' -- set Port ' + vxlan_interface
+ ' other_config:stp-path-cost=10'
1262 self
.logger
.debug("command: " + command
)
1263 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
1264 content
= stdout
.read()
1266 if len(content
) == 0:
1270 except paramiko
.ssh_exception
.SSHException
as e
:
1271 self
.logger
.error("create_ovs_vxlan_tunnel ssh Exception: " + str(e
))
1272 if "SSH session not active" in str(e
):
1276 def delete_ovs_vxlan_tunnel(self
, vxlan_interface
):
1278 Delete a vlxan tunnel port from a OVS brdige.
1279 :param vxlan_interface: vlxan name to be delete it.
1280 :return: True if success.
1285 command
= 'sudo ovs-vsctl del-port br-int ' + vxlan_interface
1286 self
.logger
.debug("command: " + command
)
1287 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
1288 content
= stdout
.read()
1290 if len(content
) == 0:
1294 except paramiko
.ssh_exception
.SSHException
as e
:
1295 self
.logger
.error("delete_ovs_vxlan_tunnel ssh Exception: " + str(e
))
1296 if "SSH session not active" in str(e
):
1300 def delete_ovs_bridge(self
):
1302 Delete a OVS bridge from a compute.
1303 :return: True if success
1308 command
= 'sudo ovs-vsctl del-br br-int'
1309 self
.logger
.debug("command: " + command
)
1310 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
1311 content
= stdout
.read()
1312 if len(content
) == 0:
1316 except paramiko
.ssh_exception
.SSHException
as e
:
1317 self
.logger
.error("delete_ovs_bridge ssh Exception: " + str(e
))
1318 if "SSH session not active" in str(e
):
1322 def get_file_info(self
, path
):
1323 command
= 'ls -lL --time-style=+%Y-%m-%dT%H:%M:%S ' + path
1324 self
.logger
.debug("command: " + command
)
1325 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
1326 content
= stdout
.read()
1327 if len(content
) == 0:
1328 return None # file does not exist
1330 return content
.split(" ") # (permission, 1, owner, group, size, date, file)
1332 def qemu_get_info(self
, path
):
1333 command
= 'qemu-img info ' + path
1334 self
.logger
.debug("command: " + command
)
1335 (_
, stdout
, stderr
) = self
.ssh_conn
.exec_command(command
)
1336 content
= stdout
.read()
1337 if len(content
) == 0:
1338 error
= stderr
.read()
1339 self
.logger
.error("get_qemu_info error " + error
)
1340 raise paramiko
.ssh_exception
.SSHException("Error getting qemu_info: " + error
)
1343 return yaml
.load(content
)
1344 except yaml
.YAMLError
as exc
:
1346 if hasattr(exc
, 'problem_mark'):
1347 mark
= exc
.problem_mark
1348 text
= " at position: (%s:%s)" % (mark
.line
+1, mark
.column
+1)
1349 self
.logger
.error("get_qemu_info yaml format Exception " + text
)
1350 raise paramiko
.ssh_exception
.SSHException("Error getting qemu_info yaml format" + text
)
1352 def qemu_change_backing(self
, inc_file
, new_backing_file
):
1353 command
= 'qemu-img rebase -u -b ' + new_backing_file
+ ' ' + inc_file
1354 self
.logger
.debug("command: " + command
)
1355 (_
, _
, stderr
) = self
.ssh_conn
.exec_command(command
)
1356 content
= stderr
.read()
1357 if len(content
) == 0:
1360 self
.logger
.error("qemu_change_backing error: " + content
)
1363 def qemu_create_empty_disk(self
, dev
):
1365 if not dev
and 'source' not in dev
and 'file format' not in dev
and 'image_size' not in dev
:
1366 self
.logger
.error("qemu_create_empty_disk error: missing image parameter")
1369 empty_disk_path
= dev
['source file']
1371 command
= 'qemu-img create -f qcow2 ' + empty_disk_path
+ ' ' + str(dev
['image_size']) + 'G'
1372 self
.logger
.debug("command: " + command
)
1373 (_
, _
, stderr
) = self
.ssh_conn
.exec_command(command
)
1374 content
= stderr
.read()
1375 if len(content
) == 0:
1378 self
.logger
.error("qemu_create_empty_disk error: " + content
)
1381 def get_notused_filename(self
, proposed_name
, suffix
=''):
1382 '''Look for a non existing file_name in the host
1383 proposed_name: proposed file name, includes path
1384 suffix: suffix to be added to the name, before the extention
1386 extension
= proposed_name
.rfind(".")
1387 slash
= proposed_name
.rfind("/")
1388 if extension
< 0 or extension
< slash
: # no extension
1389 extension
= len(proposed_name
)
1390 target_name
= proposed_name
[:extension
] + suffix
+ proposed_name
[extension
:]
1391 info
= self
.get_file_info(target_name
)
1396 while info
is not None:
1397 target_name
= proposed_name
[:extension
] + suffix
+ "-" + str(index
) + proposed_name
[extension
:]
1399 info
= self
.get_file_info(target_name
)
1402 def get_notused_path(self
, proposed_path
, suffix
=''):
1403 '''Look for a non existing path at database for images
1404 proposed_path: proposed file name, includes path
1405 suffix: suffix to be added to the name, before the extention
1407 extension
= proposed_path
.rfind(".")
1409 extension
= len(proposed_path
)
1411 target_path
= proposed_path
[:extension
] + suffix
+ proposed_path
[extension
:]
1414 r
,_
=self
.db
.get_table(FROM
="images",WHERE
={"path":target_path
})
1417 target_path
= proposed_path
[:extension
] + suffix
+ "-" + str(index
) + proposed_path
[extension
:]
1421 def delete_file(self
, file_name
):
1422 command
= 'rm -f '+file_name
1423 self
.logger
.debug("command: " + command
)
1424 (_
, _
, stderr
) = self
.ssh_conn
.exec_command(command
)
1425 error_msg
= stderr
.read()
1426 if len(error_msg
) > 0:
1427 raise paramiko
.ssh_exception
.SSHException("Error deleting file: " + error_msg
)
1429 def copy_file(self
, source
, destination
, perserve_time
=True):
1430 if source
[0:4]=="http":
1431 command
= "wget --no-verbose -O '{dst}' '{src}' 2>'{dst_result}' || cat '{dst_result}' >&2 && rm '{dst_result}'".format(
1432 dst
=destination
, src
=source
, dst_result
=destination
+ ".result" )
1434 command
= 'cp --no-preserve=mode'
1436 command
+= ' --preserve=timestamps'
1437 command
+= " '{}' '{}'".format(source
, destination
)
1438 self
.logger
.debug("command: " + command
)
1439 (_
, _
, stderr
) = self
.ssh_conn
.exec_command(command
)
1440 error_msg
= stderr
.read()
1441 if len(error_msg
) > 0:
1442 raise paramiko
.ssh_exception
.SSHException("Error copying image to local host: " + error_msg
)
1444 def copy_remote_file(self
, remote_file
, use_incremental
):
1445 ''' Copy a file from the repository to local folder and recursively
1446 copy the backing files in case the remote file is incremental
1447 Read and/or modified self.localinfo['files'] that contain the
1448 unmodified copies of images in the local path
1450 remote_file: path of remote file
1451 use_incremental: None (leave the decision to this function), True, False
1453 local_file: name of local file
1454 qemu_info: dict with quemu information of local file
1455 use_incremental_out: True, False; same as use_incremental, but if None a decision is taken
1458 use_incremental_out
= use_incremental
1459 new_backing_file
= None
1461 file_from_local
= True
1463 #in case incremental use is not decided, take the decision depending on the image
1464 #avoid the use of incremental if this image is already incremental
1465 if remote_file
[0:4] == "http":
1466 file_from_local
= False
1468 qemu_remote_info
= self
.qemu_get_info(remote_file
)
1469 if use_incremental_out
==None:
1470 use_incremental_out
= not ( file_from_local
and 'backing file' in qemu_remote_info
)
1471 #copy recursivelly the backing files
1472 if file_from_local
and 'backing file' in qemu_remote_info
:
1473 new_backing_file
, _
, _
= self
.copy_remote_file(qemu_remote_info
['backing file'], True)
1475 #check if remote file is present locally
1476 if use_incremental_out
and remote_file
in self
.localinfo
['files']:
1477 local_file
= self
.localinfo
['files'][remote_file
]
1478 local_file_info
= self
.get_file_info(local_file
)
1480 remote_file_info
= self
.get_file_info(remote_file
)
1481 if local_file_info
== None:
1483 elif file_from_local
and (local_file_info
[4]!=remote_file_info
[4] or local_file_info
[5]!=remote_file_info
[5]):
1484 #local copy of file not valid because date or size are different.
1485 #TODO DELETE local file if this file is not used by any active virtual machine
1487 self
.delete_file(local_file
)
1488 del self
.localinfo
['files'][remote_file
]
1492 else: #check that the local file has the same backing file, or there are not backing at all
1493 qemu_info
= self
.qemu_get_info(local_file
)
1494 if new_backing_file
!= qemu_info
.get('backing file'):
1498 if local_file
== None: #copy the file
1499 img_name
= remote_file
.split('/') [-1]
1500 img_local
= self
.image_path
+ '/' + img_name
1501 local_file
= self
.get_notused_filename(img_local
)
1502 self
.copy_file(remote_file
, local_file
, use_incremental_out
)
1504 if use_incremental_out
:
1505 self
.localinfo
['files'][remote_file
] = local_file
1506 if new_backing_file
:
1507 self
.qemu_change_backing(local_file
, new_backing_file
)
1508 qemu_info
= self
.qemu_get_info(local_file
)
1510 return local_file
, qemu_info
, use_incremental_out
1512 def launch_server(self
, conn
, server
, rebuild
=False, domain
=None):
1514 time
.sleep(random
.randint(20,150)) #sleep random timeto be make it a bit more real
1517 server_id
= server
['uuid']
1518 paused
= server
.get('paused','no')
1520 if domain
!=None and rebuild
==False:
1522 #self.server_status[server_id] = 'ACTIVE'
1525 self
.db_lock
.acquire()
1526 result
, server_data
= self
.db
.get_instance(server_id
)
1527 self
.db_lock
.release()
1529 self
.logger
.error("launch_server ERROR getting server from DB %d %s", result
, server_data
)
1530 return result
, server_data
1532 #0: get image metadata
1533 server_metadata
= server
.get('metadata', {})
1534 use_incremental
= None
1536 if "use_incremental" in server_metadata
:
1537 use_incremental
= False if server_metadata
["use_incremental"] == "no" else True
1539 server_host_files
= self
.localinfo
['server_files'].get( server
['uuid'], {})
1541 #delete previous incremental files
1542 for file_
in server_host_files
.values():
1543 self
.delete_file(file_
['source file'] )
1544 server_host_files
={}
1546 #1: obtain aditional devices (disks)
1547 #Put as first device the main disk
1548 devices
= [ {"type":"disk", "image_id":server
['image_id'], "vpci":server_metadata
.get('vpci', None) } ]
1549 if 'extended' in server_data
and server_data
['extended']!=None and "devices" in server_data
['extended']:
1550 devices
+= server_data
['extended']['devices']
1553 image_id
= dev
.get('image_id')
1556 uuid_empty
= str(uuid
.uuid4())
1557 empty_path
= self
.empty_image_path
+ uuid_empty
+ '.qcow2' # local path for empty disk
1559 dev
['source file'] = empty_path
1560 dev
['file format'] = 'qcow2'
1561 self
.qemu_create_empty_disk(dev
)
1562 server_host_files
[uuid_empty
] = {'source file': empty_path
,
1563 'file format': dev
['file format']}
1567 self
.db_lock
.acquire()
1568 result
, content
= self
.db
.get_table(FROM
='images', SELECT
=('path', 'metadata'),
1569 WHERE
={'uuid': image_id
})
1570 self
.db_lock
.release()
1572 error_text
= "ERROR", result
, content
, "when getting image", dev
['image_id']
1573 self
.logger
.error("launch_server " + error_text
)
1574 return -1, error_text
1575 if content
[0]['metadata'] is not None:
1576 dev
['metadata'] = json
.loads(content
[0]['metadata'])
1578 dev
['metadata'] = {}
1580 if image_id
in server_host_files
:
1581 dev
['source file'] = server_host_files
[image_id
]['source file'] #local path
1582 dev
['file format'] = server_host_files
[image_id
]['file format'] # raw or qcow2
1585 #2: copy image to host
1587 remote_file
= content
[0]['path']
1589 remote_file
= empty_path
1590 use_incremental_image
= use_incremental
1591 if dev
['metadata'].get("use_incremental") == "no":
1592 use_incremental_image
= False
1593 local_file
, qemu_info
, use_incremental_image
= self
.copy_remote_file(remote_file
, use_incremental_image
)
1595 #create incremental image
1596 if use_incremental_image
:
1597 local_file_inc
= self
.get_notused_filename(local_file
, '.inc')
1598 command
= 'qemu-img create -f qcow2 '+local_file_inc
+ ' -o backing_file='+ local_file
1599 self
.logger
.debug("command: " + command
)
1600 (_
, _
, stderr
) = self
.ssh_conn
.exec_command(command
)
1601 error_msg
= stderr
.read()
1602 if len(error_msg
) > 0:
1603 raise paramiko
.ssh_exception
.SSHException("Error creating incremental file: " + error_msg
)
1604 local_file
= local_file_inc
1605 qemu_info
= {'file format':'qcow2'}
1607 server_host_files
[ dev
['image_id'] ] = {'source file': local_file
, 'file format': qemu_info
['file format']}
1609 dev
['source file'] = local_file
1610 dev
['file format'] = qemu_info
['file format']
1612 self
.localinfo
['server_files'][ server
['uuid'] ] = server_host_files
1613 self
.localinfo_dirty
= True
1616 result
, xml
= self
.create_xml_server(server_data
, devices
, server_metadata
) #local_file
1618 self
.logger
.error("create xml server error: " + xml
)
1620 self
.logger
.debug("create xml: " + xml
)
1621 atribute
= host_thread
.lvirt_module
.VIR_DOMAIN_START_PAUSED
if paused
== "yes" else 0
1623 if not rebuild
: #ensures that any pending destroying server is done
1624 self
.server_forceoff(True)
1625 #self.logger.debug("launching instance " + xml)
1626 conn
.createXML(xml
, atribute
)
1627 #self.server_status[server_id] = 'PAUSED' if paused == "yes" else 'ACTIVE'
1631 except paramiko
.ssh_exception
.SSHException
as e
:
1633 self
.logger
.error("launch_server id='%s' ssh Exception: %s", server_id
, text
)
1634 if "SSH session not active" in text
:
1636 except host_thread
.lvirt_module
.libvirtError
as e
:
1637 text
= e
.get_error_message()
1638 self
.logger
.error("launch_server id='%s' libvirt Exception: %s", server_id
, text
)
1639 except Exception as e
:
1641 self
.logger
.error("launch_server id='%s' Exception: %s", server_id
, text
)
1644 def update_servers_status(self
):
1646 # VIR_DOMAIN_NOSTATE = 0
1647 # VIR_DOMAIN_RUNNING = 1
1648 # VIR_DOMAIN_BLOCKED = 2
1649 # VIR_DOMAIN_PAUSED = 3
1650 # VIR_DOMAIN_SHUTDOWN = 4
1651 # VIR_DOMAIN_SHUTOFF = 5
1652 # VIR_DOMAIN_CRASHED = 6
1653 # VIR_DOMAIN_PMSUSPENDED = 7 #TODO suspended
1655 if self
.test
or len(self
.server_status
)==0:
1659 conn
= host_thread
.lvirt_module
.open(self
.lvirt_conn_uri
)
1660 domains
= conn
.listAllDomains()
1662 for domain
in domains
:
1663 uuid
= domain
.UUIDString() ;
1664 libvirt_status
= domain
.state()
1665 #print libvirt_status
1666 if libvirt_status
[0] == host_thread
.lvirt_module
.VIR_DOMAIN_RUNNING
or libvirt_status
[0] == host_thread
.lvirt_module
.VIR_DOMAIN_SHUTDOWN
:
1667 new_status
= "ACTIVE"
1668 elif libvirt_status
[0] == host_thread
.lvirt_module
.VIR_DOMAIN_PAUSED
:
1669 new_status
= "PAUSED"
1670 elif libvirt_status
[0] == host_thread
.lvirt_module
.VIR_DOMAIN_SHUTOFF
:
1671 new_status
= "INACTIVE"
1672 elif libvirt_status
[0] == host_thread
.lvirt_module
.VIR_DOMAIN_CRASHED
:
1673 new_status
= "ERROR"
1676 domain_dict
[uuid
] = new_status
1678 except host_thread
.lvirt_module
.libvirtError
as e
:
1679 self
.logger
.error("get_state() Exception " + e
.get_error_message())
1682 for server_id
, current_status
in self
.server_status
.iteritems():
1684 if server_id
in domain_dict
:
1685 new_status
= domain_dict
[server_id
]
1687 new_status
= "INACTIVE"
1689 if new_status
== None or new_status
== current_status
:
1691 if new_status
== 'INACTIVE' and current_status
== 'ERROR':
1692 continue #keep ERROR status, because obviously this machine is not running
1694 self
.logger
.debug("server id='%s' status change from '%s' to '%s'", server_id
, current_status
, new_status
)
1695 STATUS
={'progress':100, 'status':new_status
}
1696 if new_status
== 'ERROR':
1697 STATUS
['last_error'] = 'machine has crashed'
1698 self
.db_lock
.acquire()
1699 r
,_
= self
.db
.update_rows('instances', STATUS
, {'uuid':server_id
}, log
=False)
1700 self
.db_lock
.release()
1702 self
.server_status
[server_id
] = new_status
1704 def action_on_server(self
, req
, last_retry
=True):
1705 '''Perform an action on a req
1707 req: dictionary that contain:
1708 server properties: 'uuid','name','tenant_id','status'
1710 host properties: 'user', 'ip_name'
1711 return (error, text)
1712 0: No error. VM is updated to new state,
1713 -1: Invalid action, as trying to pause a PAUSED VM
1714 -2: Error accessing host
1716 -4: Error at DB access
1717 -5: Error while trying to perform action. VM is updated to ERROR
1719 server_id
= req
['uuid']
1722 old_status
= req
['status']
1726 if 'terminate' in req
['action']:
1727 new_status
= 'deleted'
1728 elif 'shutoff' in req
['action'] or 'shutdown' in req
['action'] or 'forceOff' in req
['action']:
1729 if req
['status']!='ERROR':
1731 new_status
= 'INACTIVE'
1732 elif 'start' in req
['action'] and req
['status']!='ERROR':
1733 new_status
= 'ACTIVE'
1734 elif 'resume' in req
['action'] and req
['status']!='ERROR' and req
['status']!='INACTIVE':
1735 new_status
= 'ACTIVE'
1736 elif 'pause' in req
['action'] and req
['status']!='ERROR':
1737 new_status
= 'PAUSED'
1738 elif 'reboot' in req
['action'] and req
['status']!='ERROR':
1739 new_status
= 'ACTIVE'
1740 elif 'rebuild' in req
['action']:
1741 time
.sleep(random
.randint(20,150))
1742 new_status
= 'ACTIVE'
1743 elif 'createImage' in req
['action']:
1745 self
.create_image(None, req
)
1748 conn
= host_thread
.lvirt_module
.open(self
.lvirt_conn_uri
)
1750 dom
= conn
.lookupByUUIDString(server_id
)
1751 except host_thread
.lvirt_module
.libvirtError
as e
:
1752 text
= e
.get_error_message()
1753 if 'LookupByUUIDString' in text
or 'Domain not found' in text
or 'No existe un dominio coincidente' in text
:
1756 self
.logger
.error("action_on_server id='%s' libvirt exception: %s", server_id
, text
)
1759 if 'forceOff' in req
['action']:
1761 self
.logger
.debug("action_on_server id='%s' domain not running", server_id
)
1764 self
.logger
.debug("sending DESTROY to server id='%s'", server_id
)
1766 except Exception as e
:
1767 if "domain is not running" not in e
.get_error_message():
1768 self
.logger
.error("action_on_server id='%s' Exception while sending force off: %s",
1769 server_id
, e
.get_error_message())
1770 last_error
= 'action_on_server Exception while destroy: ' + e
.get_error_message()
1771 new_status
= 'ERROR'
1773 elif 'terminate' in req
['action']:
1775 self
.logger
.debug("action_on_server id='%s' domain not running", server_id
)
1776 new_status
= 'deleted'
1779 if req
['action']['terminate'] == 'force':
1780 self
.logger
.debug("sending DESTROY to server id='%s'", server_id
)
1782 new_status
= 'deleted'
1784 self
.logger
.debug("sending SHUTDOWN to server id='%s'", server_id
)
1786 self
.pending_terminate_server
.append( (time
.time()+10,server_id
) )
1787 except Exception as e
:
1788 self
.logger
.error("action_on_server id='%s' Exception while destroy: %s",
1789 server_id
, e
.get_error_message())
1790 last_error
= 'action_on_server Exception while destroy: ' + e
.get_error_message()
1791 new_status
= 'ERROR'
1792 if "domain is not running" in e
.get_error_message():
1795 new_status
= 'deleted'
1797 self
.logger
.error("action_on_server id='%s' Exception while undefine: %s",
1798 server_id
, e
.get_error_message())
1799 last_error
= 'action_on_server Exception2 while undefine:', e
.get_error_message()
1800 #Exception: 'virDomainDetachDevice() failed'
1801 if new_status
=='deleted':
1802 if server_id
in self
.server_status
:
1803 del self
.server_status
[server_id
]
1804 if req
['uuid'] in self
.localinfo
['server_files']:
1805 for file_
in self
.localinfo
['server_files'][ req
['uuid'] ].values():
1807 self
.delete_file(file_
['source file'])
1810 del self
.localinfo
['server_files'][ req
['uuid'] ]
1811 self
.localinfo_dirty
= True
1813 elif 'shutoff' in req
['action'] or 'shutdown' in req
['action']:
1816 self
.logger
.debug("action_on_server id='%s' domain not running", server_id
)
1819 # new_status = 'INACTIVE'
1820 #TODO: check status for changing at database
1821 except Exception as e
:
1822 new_status
= 'ERROR'
1823 self
.logger
.error("action_on_server id='%s' Exception while shutdown: %s",
1824 server_id
, e
.get_error_message())
1825 last_error
= 'action_on_server Exception while shutdown: ' + e
.get_error_message()
1827 elif 'rebuild' in req
['action']:
1830 r
= self
.launch_server(conn
, req
, True, None)
1832 new_status
= 'ERROR'
1835 new_status
= 'ACTIVE'
1836 elif 'start' in req
['action']:
1837 # The instance is only create in DB but not yet at libvirt domain, needs to be create
1838 rebuild
= True if req
['action']['start'] == 'rebuild' else False
1839 r
= self
.launch_server(conn
, req
, rebuild
, dom
)
1841 new_status
= 'ERROR'
1844 new_status
= 'ACTIVE'
1846 elif 'resume' in req
['action']:
1852 # new_status = 'ACTIVE'
1853 except Exception as e
:
1854 self
.logger
.error("action_on_server id='%s' Exception while resume: %s",
1855 server_id
, e
.get_error_message())
1857 elif 'pause' in req
['action']:
1863 # new_status = 'PAUSED'
1864 except Exception as e
:
1865 self
.logger
.error("action_on_server id='%s' Exception while pause: %s",
1866 server_id
, e
.get_error_message())
1868 elif 'reboot' in req
['action']:
1874 self
.logger
.debug("action_on_server id='%s' reboot:", server_id
)
1875 #new_status = 'ACTIVE'
1876 except Exception as e
:
1877 self
.logger
.error("action_on_server id='%s' Exception while reboot: %s",
1878 server_id
, e
.get_error_message())
1879 elif 'createImage' in req
['action']:
1880 self
.create_image(dom
, req
)
1884 except host_thread
.lvirt_module
.libvirtError
as e
:
1885 if conn
is not None: conn
.close()
1886 text
= e
.get_error_message()
1887 new_status
= "ERROR"
1889 if 'LookupByUUIDString' in text
or 'Domain not found' in text
or 'No existe un dominio coincidente' in text
:
1890 self
.logger
.debug("action_on_server id='%s' Exception removed from host", server_id
)
1892 self
.logger
.error("action_on_server id='%s' Exception %s", server_id
, text
)
1893 #end of if self.test
1894 if new_status
== None:
1897 self
.logger
.debug("action_on_server id='%s' new status=%s %s",server_id
, new_status
, last_error
)
1898 UPDATE
= {'progress':100, 'status':new_status
}
1900 if new_status
=='ERROR':
1901 if not last_retry
: #if there will be another retry do not update database
1903 elif 'terminate' in req
['action']:
1904 #PUT a log in the database
1905 self
.logger
.error("PANIC deleting server id='%s' %s", server_id
, last_error
)
1906 self
.db_lock
.acquire()
1907 self
.db
.new_row('logs',
1908 {'uuid':server_id
, 'tenant_id':req
['tenant_id'], 'related':'instances','level':'panic',
1909 'description':'PANIC deleting server from host '+self
.name
+': '+last_error
}
1911 self
.db_lock
.release()
1912 if server_id
in self
.server_status
:
1913 del self
.server_status
[server_id
]
1916 UPDATE
['last_error'] = last_error
1917 if new_status
!= 'deleted' and (new_status
!= old_status
or new_status
== 'ERROR') :
1918 self
.db_lock
.acquire()
1919 self
.db
.update_rows('instances', UPDATE
, {'uuid':server_id
}, log
=True)
1920 self
.server_status
[server_id
] = new_status
1921 self
.db_lock
.release()
1922 if new_status
== 'ERROR':
1927 def restore_iface(self
, name
, mac
, lib_conn
=None):
1928 ''' make an ifdown, ifup to restore default parameter of na interface
1930 mac: mac address of the interface
1931 lib_conn: connection to the libvirt, if None a new connection is created
1932 Return 0,None if ok, -1,text if fails
1938 self
.logger
.debug("restore_iface '%s' %s", name
, mac
)
1942 conn
= host_thread
.lvirt_module
.open(self
.lvirt_conn_uri
)
1946 #wait to the pending VM deletion
1947 #TODO.Revise self.server_forceoff(True)
1949 iface
= conn
.interfaceLookupByMACString(mac
)
1950 if iface
.isActive():
1953 self
.logger
.debug("restore_iface '%s' %s", name
, mac
)
1954 except host_thread
.lvirt_module
.libvirtError
as e
:
1955 error_text
= e
.get_error_message()
1956 self
.logger
.error("restore_iface '%s' '%s' libvirt exception: %s", name
, mac
, error_text
)
1959 if lib_conn
is None and conn
is not None:
1961 return ret
, error_text
1964 def create_image(self
,dom
, req
):
1966 if 'path' in req
['action']['createImage']:
1967 file_dst
= req
['action']['createImage']['path']
1969 createImage
=req
['action']['createImage']
1970 img_name
= createImage
['source']['path']
1971 index
=img_name
.rfind('/')
1972 file_dst
= self
.get_notused_path(img_name
[:index
+1] + createImage
['name'] + '.qcow2')
1973 image_status
='ACTIVE'
1977 server_id
= req
['uuid']
1978 createImage
=req
['action']['createImage']
1979 file_orig
= self
.localinfo
['server_files'][server_id
] [ createImage
['source']['image_id'] ] ['source file']
1980 if 'path' in req
['action']['createImage']:
1981 file_dst
= req
['action']['createImage']['path']
1983 img_name
= createImage
['source']['path']
1984 index
=img_name
.rfind('/')
1985 file_dst
= self
.get_notused_filename(img_name
[:index
+1] + createImage
['name'] + '.qcow2')
1987 self
.copy_file(file_orig
, file_dst
)
1988 qemu_info
= self
.qemu_get_info(file_orig
)
1989 if 'backing file' in qemu_info
:
1990 for k
,v
in self
.localinfo
['files'].items():
1991 if v
==qemu_info
['backing file']:
1992 self
.qemu_change_backing(file_dst
, k
)
1994 image_status
='ACTIVE'
1996 except paramiko
.ssh_exception
.SSHException
as e
:
1997 image_status
='ERROR'
1998 error_text
= e
.args
[0]
1999 self
.logger
.error("create_image id='%s' ssh Exception: %s", server_id
, error_text
)
2000 if "SSH session not active" in error_text
and retry
==0:
2002 except Exception as e
:
2003 image_status
='ERROR'
2005 self
.logger
.error("create_image id='%s' Exception: %s", server_id
, error_text
)
2007 #TODO insert a last_error at database
2008 self
.db_lock
.acquire()
2009 self
.db
.update_rows('images', {'status':image_status
, 'progress': 100, 'path':file_dst
},
2010 {'uuid':req
['new_image']['uuid']}, log
=True)
2011 self
.db_lock
.release()
2013 def edit_iface(self
, port_id
, old_net
, new_net
):
2014 #This action imply remove and insert interface to put proper parameters
2019 self
.db_lock
.acquire()
2020 r
,c
= self
.db
.get_table(FROM
='ports as p join resources_port as rp on p.uuid=rp.port_id',
2021 WHERE
={'port_id': port_id
})
2022 self
.db_lock
.release()
2024 self
.logger
.error("edit_iface %s DDBB error: %s", port_id
, c
)
2027 self
.logger
.error("edit_iface %s port not found", port_id
)
2030 if port
["model"]!="VF":
2031 self
.logger
.error("edit_iface %s ERROR model must be VF", port_id
)
2033 #create xml detach file
2036 xml
.append("<interface type='hostdev' managed='yes'>")
2037 xml
.append(" <mac address='" +port
['mac']+ "'/>")
2038 xml
.append(" <source>"+ self
.pci2xml(port
['pci'])+"\n </source>")
2039 xml
.append('</interface>')
2044 conn
= host_thread
.lvirt_module
.open(self
.lvirt_conn_uri
)
2045 dom
= conn
.lookupByUUIDString(port
["instance_id"])
2048 self
.logger
.debug("edit_iface detaching SRIOV interface " + text
)
2049 dom
.detachDeviceFlags(text
, flags
=host_thread
.lvirt_module
.VIR_DOMAIN_AFFECT_LIVE
)
2051 xml
[-1] =" <vlan> <tag id='" + str(port
['vlan']) + "'/> </vlan>"
2053 xml
.append(self
.pci2xml(port
.get('vpci',None)) )
2054 xml
.append('</interface>')
2056 self
.logger
.debug("edit_iface attaching SRIOV interface " + text
)
2057 dom
.attachDeviceFlags(text
, flags
=host_thread
.lvirt_module
.VIR_DOMAIN_AFFECT_LIVE
)
2059 except host_thread
.lvirt_module
.libvirtError
as e
:
2060 text
= e
.get_error_message()
2061 self
.logger
.error("edit_iface %s libvirt exception: %s", port
["instance_id"], text
)
2064 if conn
is not None: conn
.close()
2067 def create_server(server
, db
, db_lock
, only_of_ports
):
2068 extended
= server
.get('extended', None)
2070 requirements
['numa']={'memory':0, 'proc_req_type': 'threads', 'proc_req_nb':0, 'port_list':[], 'sriov_list':[]}
2071 requirements
['ram'] = server
['flavor'].get('ram', 0)
2072 if requirements
['ram']== None:
2073 requirements
['ram'] = 0
2074 requirements
['vcpus'] = server
['flavor'].get('vcpus', 0)
2075 if requirements
['vcpus']== None:
2076 requirements
['vcpus'] = 0
2077 #If extended is not defined get requirements from flavor
2078 if extended
is None:
2079 #If extended is defined in flavor convert to dictionary and use it
2080 if 'extended' in server
['flavor'] and server
['flavor']['extended'] != None:
2081 json_acceptable_string
= server
['flavor']['extended'].replace("'", "\"")
2082 extended
= json
.loads(json_acceptable_string
)
2085 #print json.dumps(extended, indent=4)
2087 #For simplicity only one numa VM are supported in the initial implementation
2088 if extended
!= None:
2089 numas
= extended
.get('numas', [])
2091 return (-2, "Multi-NUMA VMs are not supported yet")
2093 # return (-1, "At least one numa must be specified")
2095 #a for loop is used in order to be ready to multi-NUMA VMs
2099 numa_req
['memory'] = numa
.get('memory', 0)
2101 numa_req
['proc_req_nb'] = numa
['cores'] #number of cores or threads to be reserved
2102 numa_req
['proc_req_type'] = 'cores' #indicates whether cores or threads must be reserved
2103 numa_req
['proc_req_list'] = numa
.get('cores-id', None) #list of ids to be assigned to the cores or threads
2104 elif 'paired-threads' in numa
:
2105 numa_req
['proc_req_nb'] = numa
['paired-threads']
2106 numa_req
['proc_req_type'] = 'paired-threads'
2107 numa_req
['proc_req_list'] = numa
.get('paired-threads-id', None)
2108 elif 'threads' in numa
:
2109 numa_req
['proc_req_nb'] = numa
['threads']
2110 numa_req
['proc_req_type'] = 'threads'
2111 numa_req
['proc_req_list'] = numa
.get('threads-id', None)
2113 numa_req
['proc_req_nb'] = 0 # by default
2114 numa_req
['proc_req_type'] = 'threads'
2118 #Generate a list of sriov and another for physical interfaces
2119 interfaces
= numa
.get('interfaces', [])
2122 for iface
in interfaces
:
2123 iface
['bandwidth'] = int(iface
['bandwidth'])
2124 if iface
['dedicated'][:3]=='yes':
2125 port_list
.append(iface
)
2127 sriov_list
.append(iface
)
2129 #Save lists ordered from more restrictive to less bw requirements
2130 numa_req
['sriov_list'] = sorted(sriov_list
, key
=lambda k
: k
['bandwidth'], reverse
=True)
2131 numa_req
['port_list'] = sorted(port_list
, key
=lambda k
: k
['bandwidth'], reverse
=True)
2134 request
.append(numa_req
)
2136 # print "----------\n"+json.dumps(request[0], indent=4)
2137 # print '----------\n\n'
2139 #Search in db for an appropriate numa for each requested numa
2140 #at the moment multi-NUMA VMs are not supported
2142 requirements
['numa'].update(request
[0])
2143 if requirements
['numa']['memory']>0:
2144 requirements
['ram']=0 #By the moment I make incompatible ask for both Huge and non huge pages memory
2145 elif requirements
['ram']==0:
2146 return (-1, "Memory information not set neither at extended field not at ram")
2147 if requirements
['numa']['proc_req_nb']>0:
2148 requirements
['vcpus']=0 #By the moment I make incompatible ask for both Isolated and non isolated cpus
2149 elif requirements
['vcpus']==0:
2150 return (-1, "Processor information not set neither at extended field not at vcpus")
2154 result
, content
= db
.get_numas(requirements
, server
.get('host_id', None), only_of_ports
)
2158 return (-1, content
)
2160 numa_id
= content
['numa_id']
2161 host_id
= content
['host_id']
2163 #obtain threads_id and calculate pinning
2166 if requirements
['numa']['proc_req_nb']>0:
2168 result
, content
= db
.get_table(FROM
='resources_core',
2169 SELECT
=('id','core_id','thread_id'),
2170 WHERE
={'numa_id':numa_id
,'instance_id': None, 'status':'ok'} )
2176 #convert rows to a dictionary indexed by core_id
2179 if not row
['core_id'] in cores_dict
:
2180 cores_dict
[row
['core_id']] = []
2181 cores_dict
[row
['core_id']].append([row
['thread_id'],row
['id']])
2183 #In case full cores are requested
2185 if requirements
['numa']['proc_req_type'] == 'cores':
2186 #Get/create the list of the vcpu_ids
2187 vcpu_id_list
= requirements
['numa']['proc_req_list']
2188 if vcpu_id_list
== None:
2189 vcpu_id_list
= range(0,int(requirements
['numa']['proc_req_nb']))
2191 for threads
in cores_dict
.itervalues():
2193 if len(threads
) != 2:
2196 #set pinning for the first thread
2197 cpu_pinning
.append( [ vcpu_id_list
.pop(0), threads
[0][0], threads
[0][1] ] )
2199 #reserve so it is not used the second thread
2200 reserved_threads
.append(threads
[1][1])
2202 if len(vcpu_id_list
) == 0:
2205 #In case paired threads are requested
2206 elif requirements
['numa']['proc_req_type'] == 'paired-threads':
2208 #Get/create the list of the vcpu_ids
2209 if requirements
['numa']['proc_req_list'] != None:
2211 for pair
in requirements
['numa']['proc_req_list']:
2213 return -1, "Field paired-threads-id not properly specified"
2215 vcpu_id_list
.append(pair
[0])
2216 vcpu_id_list
.append(pair
[1])
2218 vcpu_id_list
= range(0,2*int(requirements
['numa']['proc_req_nb']))
2220 for threads
in cores_dict
.itervalues():
2222 if len(threads
) != 2:
2224 #set pinning for the first thread
2225 cpu_pinning
.append([vcpu_id_list
.pop(0), threads
[0][0], threads
[0][1]])
2227 #set pinning for the second thread
2228 cpu_pinning
.append([vcpu_id_list
.pop(0), threads
[1][0], threads
[1][1]])
2230 if len(vcpu_id_list
) == 0:
2233 #In case normal threads are requested
2234 elif requirements
['numa']['proc_req_type'] == 'threads':
2235 #Get/create the list of the vcpu_ids
2236 vcpu_id_list
= requirements
['numa']['proc_req_list']
2237 if vcpu_id_list
== None:
2238 vcpu_id_list
= range(0,int(requirements
['numa']['proc_req_nb']))
2240 for threads_index
in sorted(cores_dict
, key
=lambda k
: len(cores_dict
[k
])):
2241 threads
= cores_dict
[threads_index
]
2242 #set pinning for the first thread
2243 cpu_pinning
.append([vcpu_id_list
.pop(0), threads
[0][0], threads
[0][1]])
2245 #if exists, set pinning for the second thread
2246 if len(threads
) == 2 and len(vcpu_id_list
) != 0:
2247 cpu_pinning
.append([vcpu_id_list
.pop(0), threads
[1][0], threads
[1][1]])
2249 if len(vcpu_id_list
) == 0:
2252 #Get the source pci addresses for the selected numa
2253 used_sriov_ports
= []
2254 for port
in requirements
['numa']['sriov_list']:
2256 result
, content
= db
.get_table(FROM
='resources_port', SELECT
=('id', 'pci', 'mac'),WHERE
={'numa_id':numa_id
,'root_id': port
['port_id'], 'port_id': None, 'Mbps_used': 0} )
2262 if row
['id'] in used_sriov_ports
or row
['id']==port
['port_id']:
2264 port
['pci'] = row
['pci']
2265 if 'mac_address' not in port
:
2266 port
['mac_address'] = row
['mac']
2268 port
['port_id']=row
['id']
2269 port
['Mbps_used'] = port
['bandwidth']
2270 used_sriov_ports
.append(row
['id'])
2273 for port
in requirements
['numa']['port_list']:
2274 port
['Mbps_used'] = None
2275 if port
['dedicated'] != "yes:sriov":
2276 port
['mac_address'] = port
['mac']
2280 result
, content
= db
.get_table(FROM
='resources_port', SELECT
=('id', 'pci', 'mac', 'Mbps'),WHERE
={'numa_id':numa_id
,'root_id': port
['port_id'], 'port_id': None, 'Mbps_used': 0} )
2285 port
['Mbps_used'] = content
[0]['Mbps']
2287 if row
['id'] in used_sriov_ports
or row
['id']==port
['port_id']:
2289 port
['pci'] = row
['pci']
2290 if 'mac_address' not in port
:
2291 port
['mac_address'] = row
['mac'] # mac cannot be set to passthrough ports
2293 port
['port_id']=row
['id']
2294 used_sriov_ports
.append(row
['id'])
2297 # print '2. Physical ports assignation:'+json.dumps(requirements['port_list'], indent=4)
2298 # print '2. SR-IOV assignation:'+json.dumps(requirements['sriov_list'], indent=4)
2300 server
['host_id'] = host_id
2302 #Generate dictionary for saving in db the instance resources
2304 resources
['bridged-ifaces'] = []
2307 numa_dict
['interfaces'] = []
2309 numa_dict
['interfaces'] += requirements
['numa']['port_list']
2310 numa_dict
['interfaces'] += requirements
['numa']['sriov_list']
2312 #Check bridge information
2313 unified_dataplane_iface
=[]
2314 unified_dataplane_iface
+= requirements
['numa']['port_list']
2315 unified_dataplane_iface
+= requirements
['numa']['sriov_list']
2317 for control_iface
in server
.get('networks', []):
2318 control_iface
['net_id']=control_iface
.pop('uuid')
2319 #Get the brifge name
2321 result
, content
= db
.get_table(FROM
='nets',
2322 SELECT
=('name', 'type', 'vlan', 'provider', 'enable_dhcp',
2323 'dhcp_first_ip', 'dhcp_last_ip', 'cidr'),
2324 WHERE
={'uuid': control_iface
['net_id']})
2329 return -1, "Error at field netwoks: Not found any network wit uuid %s" % control_iface
['net_id']
2332 if control_iface
.get("type", 'virtual') == 'virtual':
2333 if network
['type']!='bridge_data' and network
['type']!='bridge_man':
2334 return -1, "Error at field netwoks: network uuid %s for control interface is not of type bridge_man or bridge_data" % control_iface
['net_id']
2335 resources
['bridged-ifaces'].append(control_iface
)
2336 if network
.get("provider") and network
["provider"][0:3] == "OVS":
2337 control_iface
["type"] = "instance:ovs"
2339 control_iface
["type"] = "instance:bridge"
2340 if network
.get("vlan"):
2341 control_iface
["vlan"] = network
["vlan"]
2343 if network
.get("enable_dhcp") == 'true':
2344 control_iface
["enable_dhcp"] = network
.get("enable_dhcp")
2345 control_iface
["dhcp_first_ip"] = network
["dhcp_first_ip"]
2346 control_iface
["dhcp_last_ip"] = network
["dhcp_last_ip"]
2347 control_iface
["cidr"] = network
["cidr"]
2349 if network
['type']!='data' and network
['type']!='ptp':
2350 return -1, "Error at field netwoks: network uuid %s for dataplane interface is not of type data or ptp" % control_iface
['net_id']
2351 #dataplane interface, look for it in the numa tree and asign this network
2353 for dataplane_iface
in numa_dict
['interfaces']:
2354 if dataplane_iface
['name'] == control_iface
.get("name"):
2355 if (dataplane_iface
['dedicated'] == "yes" and control_iface
["type"] != "PF") or \
2356 (dataplane_iface
['dedicated'] == "no" and control_iface
["type"] != "VF") or \
2357 (dataplane_iface
['dedicated'] == "yes:sriov" and control_iface
["type"] != "VFnotShared") :
2358 return -1, "Error at field netwoks: mismatch at interface '%s' from flavor 'dedicated=%s' and networks 'type=%s'" % \
2359 (control_iface
.get("name"), dataplane_iface
['dedicated'], control_iface
["type"])
2360 dataplane_iface
['uuid'] = control_iface
['net_id']
2361 if dataplane_iface
['dedicated'] == "no":
2362 dataplane_iface
['vlan'] = network
['vlan']
2363 if dataplane_iface
['dedicated'] != "yes" and control_iface
.get("mac_address"):
2364 dataplane_iface
['mac_address'] = control_iface
.get("mac_address")
2365 if control_iface
.get("vpci"):
2366 dataplane_iface
['vpci'] = control_iface
.get("vpci")
2370 return -1, "Error at field netwoks: interface name %s from network not found at flavor" % control_iface
.get("name")
2372 resources
['host_id'] = host_id
2373 resources
['image_id'] = server
['image_id']
2374 resources
['flavor_id'] = server
['flavor_id']
2375 resources
['tenant_id'] = server
['tenant_id']
2376 resources
['ram'] = requirements
['ram']
2377 resources
['vcpus'] = requirements
['vcpus']
2378 resources
['status'] = 'CREATING'
2380 if 'description' in server
: resources
['description'] = server
['description']
2381 if 'name' in server
: resources
['name'] = server
['name']
2383 resources
['extended'] = {} #optional
2384 resources
['extended']['numas'] = []
2385 numa_dict
['numa_id'] = numa_id
2386 numa_dict
['memory'] = requirements
['numa']['memory']
2387 numa_dict
['cores'] = []
2389 for core
in cpu_pinning
:
2390 numa_dict
['cores'].append({'id': core
[2], 'vthread': core
[0], 'paired': paired
})
2391 for core
in reserved_threads
:
2392 numa_dict
['cores'].append({'id': core
})
2393 resources
['extended']['numas'].append(numa_dict
)
2394 if extended
!=None and 'devices' in extended
: #TODO allow extra devices without numa
2395 resources
['extended']['devices'] = extended
['devices']
2398 # '===================================={'
2399 #print json.dumps(resources, indent=4)
2400 #print '====================================}'