Fix minor bugs in dhcp service during vm deployment
[osm/openvim.git] / osm_openvim / host_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
5 # This file is part of openvim
6 # All Rights Reserved.
7 #
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
11 #
12 # http://www.apache.org/licenses/LICENSE-2.0
13 #
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
18 # under the License.
19 #
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact with: nfvlabs@tid.es
22 ##
23
24 '''
25 This is thread that interact with the host and the libvirt to manage VM
26 One thread will be launched per host
27 '''
28 __author__ = "Pablo Montes, Alfonso Tierno, Leonardo Mirabal"
29 __date__ = "$10-jul-2014 12:07:15$"
30
31 import json
32 import yaml
33 import threading
34 import time
35 import Queue
36 import paramiko
37 from jsonschema import validate as js_v, exceptions as js_e
38 #import libvirt
39 import imp
40 from vim_schema import localinfo_schema, hostinfo_schema
41 import random
42 import os
43 import logging
44
45
46 class host_thread(threading.Thread):
47 lvirt_module = None
48
49 def __init__(self, name, host, user, db, db_lock, test, image_path, host_id, version, develop_mode,
50 develop_bridge_iface, logger_name=None, debug=None):
51 '''Init a thread.
52 Arguments:
53 'id' number of thead
54 'name' name of thread
55 'host','user': host ip or name to manage and user
56 'db', 'db_lock': database class and lock to use it in exclusion
57 '''
58 threading.Thread.__init__(self)
59 self.name = name
60 self.host = host
61 self.user = user
62 self.db = db
63 self.db_lock = db_lock
64 self.test = test
65 self.localinfo_dirty = False
66
67 if not test and not host_thread.lvirt_module:
68 try:
69 module_info = imp.find_module("libvirt")
70 host_thread.lvirt_module = imp.load_module("libvirt", *module_info)
71 except (IOError, ImportError) as e:
72 raise ImportError("Cannot import python-libvirt. Openvim not properly installed" +str(e))
73 if logger_name:
74 self.logger_name = logger_name
75 else:
76 self.logger_name = "openvim.host."+name
77 self.logger = logging.getLogger(self.logger_name)
78 if debug:
79 self.logger.setLevel(getattr(logging, debug))
80
81
82 self.develop_mode = develop_mode
83 self.develop_bridge_iface = develop_bridge_iface
84 self.image_path = image_path
85 self.host_id = host_id
86 self.version = version
87
88 self.xml_level = 0
89 #self.pending ={}
90
91 self.server_status = {} #dictionary with pairs server_uuid:server_status
92 self.pending_terminate_server =[] #list with pairs (time,server_uuid) time to send a terminate for a server being destroyed
93 self.next_update_server_status = 0 #time when must be check servers status
94
95 self.hostinfo = None
96
97 self.queueLock = threading.Lock()
98 self.taskQueue = Queue.Queue(2000)
99 self.ssh_conn = None
100
101 def ssh_connect(self):
102 try:
103 #Connect SSH
104 self.ssh_conn = paramiko.SSHClient()
105 self.ssh_conn.set_missing_host_key_policy(paramiko.AutoAddPolicy())
106 self.ssh_conn.load_system_host_keys()
107 self.ssh_conn.connect(self.host, username=self.user, timeout=10) #, None)
108 except paramiko.ssh_exception.SSHException as e:
109 text = e.args[0]
110 self.logger.error("ssh_connect ssh Exception: " + text)
111
112 def load_localinfo(self):
113 if not self.test:
114 try:
115 #Connect SSH
116 self.ssh_connect()
117
118 command = 'mkdir -p ' + self.image_path
119 #print self.name, ': command:', command
120 (_, stdout, stderr) = self.ssh_conn.exec_command(command)
121 content = stderr.read()
122 if len(content) > 0:
123 self.logger.error("command: '%s' stderr: '%s'", command, content)
124
125 command = 'cat ' + self.image_path + '/.openvim.yaml'
126 #print self.name, ': command:', command
127 (_, stdout, stderr) = self.ssh_conn.exec_command(command)
128 content = stdout.read()
129 if len(content) == 0:
130 self.logger.error("command: '%s' stderr='%s'", command, stderr.read())
131 raise paramiko.ssh_exception.SSHException("Error empty file, command: '{}'".format(command))
132 self.localinfo = yaml.load(content)
133 js_v(self.localinfo, localinfo_schema)
134 self.localinfo_dirty = False
135 if 'server_files' not in self.localinfo:
136 self.localinfo['server_files'] = {}
137 self.logger.debug("localinfo load from host")
138 return
139
140 except paramiko.ssh_exception.SSHException as e:
141 text = e.args[0]
142 self.logger.error("load_localinfo ssh Exception: " + text)
143 except host_thread.lvirt_module.libvirtError as e:
144 text = e.get_error_message()
145 self.logger.error("load_localinfo libvirt Exception: " + text)
146 except yaml.YAMLError as exc:
147 text = ""
148 if hasattr(exc, 'problem_mark'):
149 mark = exc.problem_mark
150 text = " at position: (%s:%s)" % (mark.line+1, mark.column+1)
151 self.logger.error("load_localinfo yaml format Exception " + text)
152 except js_e.ValidationError as e:
153 text = ""
154 if len(e.path)>0: text=" at '" + ":".join(map(str, e.path))+"'"
155 self.logger.error("load_localinfo format Exception: %s %s", text, str(e))
156 except Exception as e:
157 text = str(e)
158 self.logger.error("load_localinfo Exception: " + text)
159
160 #not loaded, insert a default data and force saving by activating dirty flag
161 self.localinfo = {'files':{}, 'server_files':{} }
162 #self.localinfo_dirty=True
163 self.localinfo_dirty=False
164
165 def load_hostinfo(self):
166 if self.test:
167 return;
168 try:
169 #Connect SSH
170 self.ssh_connect()
171
172
173 command = 'cat ' + self.image_path + '/hostinfo.yaml'
174 #print self.name, ': command:', command
175 (_, stdout, stderr) = self.ssh_conn.exec_command(command)
176 content = stdout.read()
177 if len(content) == 0:
178 self.logger.error("command: '%s' stderr: '%s'", command, stderr.read())
179 raise paramiko.ssh_exception.SSHException("Error empty file ")
180 self.hostinfo = yaml.load(content)
181 js_v(self.hostinfo, hostinfo_schema)
182 self.logger.debug("hostlinfo load from host " + str(self.hostinfo))
183 return
184
185 except paramiko.ssh_exception.SSHException as e:
186 text = e.args[0]
187 self.logger.error("load_hostinfo ssh Exception: " + text)
188 except host_thread.lvirt_module.libvirtError as e:
189 text = e.get_error_message()
190 self.logger.error("load_hostinfo libvirt Exception: " + text)
191 except yaml.YAMLError as exc:
192 text = ""
193 if hasattr(exc, 'problem_mark'):
194 mark = exc.problem_mark
195 text = " at position: (%s:%s)" % (mark.line+1, mark.column+1)
196 self.logger.error("load_hostinfo yaml format Exception " + text)
197 except js_e.ValidationError as e:
198 text = ""
199 if len(e.path)>0: text=" at '" + ":".join(map(str, e.path))+"'"
200 self.logger.error("load_hostinfo format Exception: %s %s", text, e.message)
201 except Exception as e:
202 text = str(e)
203 self.logger.error("load_hostinfo Exception: " + text)
204
205 #not loaded, insert a default data
206 self.hostinfo = None
207
208 def save_localinfo(self, tries=3):
209 if self.test:
210 self.localinfo_dirty = False
211 return
212
213 while tries>=0:
214 tries-=1
215
216 try:
217 command = 'cat > ' + self.image_path + '/.openvim.yaml'
218 self.logger.debug("command:" + command)
219 (stdin, _, _) = self.ssh_conn.exec_command(command)
220 yaml.safe_dump(self.localinfo, stdin, explicit_start=True, indent=4, default_flow_style=False, tags=False, encoding='utf-8', allow_unicode=True)
221 self.localinfo_dirty = False
222 break #while tries
223
224 except paramiko.ssh_exception.SSHException as e:
225 text = e.args[0]
226 self.logger.error("save_localinfo ssh Exception: " + text)
227 if "SSH session not active" in text:
228 self.ssh_connect()
229 except host_thread.lvirt_module.libvirtError as e:
230 text = e.get_error_message()
231 self.logger.error("save_localinfo libvirt Exception: " + text)
232 except yaml.YAMLError as exc:
233 text = ""
234 if hasattr(exc, 'problem_mark'):
235 mark = exc.problem_mark
236 text = " at position: (%s:%s)" % (mark.line+1, mark.column+1)
237 self.logger.error("save_localinfo yaml format Exception " + text)
238 except Exception as e:
239 text = str(e)
240 self.logger.error("save_localinfo Exception: " + text)
241
242 def load_servers_from_db(self):
243 self.db_lock.acquire()
244 r,c = self.db.get_table(SELECT=('uuid','status', 'image_id'), FROM='instances', WHERE={'host_id': self.host_id})
245 self.db_lock.release()
246
247 self.server_status = {}
248 if r<0:
249 self.logger.error("Error getting data from database: " + c)
250 return
251 for server in c:
252 self.server_status[ server['uuid'] ] = server['status']
253
254 #convert from old version to new one
255 if 'inc_files' in self.localinfo and server['uuid'] in self.localinfo['inc_files']:
256 server_files_dict = {'source file': self.localinfo['inc_files'][ server['uuid'] ] [0], 'file format':'raw' }
257 if server_files_dict['source file'][-5:] == 'qcow2':
258 server_files_dict['file format'] = 'qcow2'
259
260 self.localinfo['server_files'][ server['uuid'] ] = { server['image_id'] : server_files_dict }
261 if 'inc_files' in self.localinfo:
262 del self.localinfo['inc_files']
263 self.localinfo_dirty = True
264
265 def delete_unused_files(self):
266 '''Compares self.localinfo['server_files'] content with real servers running self.server_status obtained from database
267 Deletes unused entries at self.loacalinfo and the corresponding local files.
268 The only reason for this mismatch is the manual deletion of instances (VM) at database
269 '''
270 if self.test:
271 return
272 for uuid,images in self.localinfo['server_files'].items():
273 if uuid not in self.server_status:
274 for localfile in images.values():
275 try:
276 self.logger.debug("deleting file '%s' of unused server '%s'", localfile['source file'], uuid)
277 self.delete_file(localfile['source file'])
278 except paramiko.ssh_exception.SSHException as e:
279 self.logger.error("Exception deleting file '%s': %s", localfile['source file'], str(e))
280 del self.localinfo['server_files'][uuid]
281 self.localinfo_dirty = True
282
283 def insert_task(self, task, *aditional):
284 try:
285 self.queueLock.acquire()
286 task = self.taskQueue.put( (task,) + aditional, timeout=5)
287 self.queueLock.release()
288 return 1, None
289 except Queue.Full:
290 return -1, "timeout inserting a task over host " + self.name
291
292 def run(self):
293 while True:
294 self.load_localinfo()
295 self.load_hostinfo()
296 self.load_servers_from_db()
297 self.delete_unused_files()
298 while True:
299 try:
300 self.queueLock.acquire()
301 if not self.taskQueue.empty():
302 task = self.taskQueue.get()
303 else:
304 task = None
305 self.queueLock.release()
306
307 if task is None:
308 now=time.time()
309 if self.localinfo_dirty:
310 self.save_localinfo()
311 elif self.next_update_server_status < now:
312 self.update_servers_status()
313 self.next_update_server_status = now + 5
314 elif len(self.pending_terminate_server)>0 and self.pending_terminate_server[0][0]<now:
315 self.server_forceoff()
316 else:
317 time.sleep(1)
318 continue
319
320 if task[0] == 'instance':
321 self.logger.debug("processing task instance " + str(task[1]['action']))
322 retry = 0
323 while retry < 2:
324 retry += 1
325 r = self.action_on_server(task[1], retry==2)
326 if r >= 0:
327 break
328 elif task[0] == 'image':
329 pass
330 elif task[0] == 'exit':
331 self.logger.debug("processing task exit")
332 self.terminate()
333 return 0
334 elif task[0] == 'reload':
335 self.logger.debug("processing task reload terminating and relaunching")
336 self.terminate()
337 break
338 elif task[0] == 'edit-iface':
339 self.logger.debug("processing task edit-iface port={}, old_net={}, new_net={}".format(
340 task[1], task[2], task[3]))
341 self.edit_iface(task[1], task[2], task[3])
342 elif task[0] == 'restore-iface':
343 self.logger.debug("processing task restore-iface={} mac={}".format(task[1], task[2]))
344 self.restore_iface(task[1], task[2])
345 elif task[0] == 'new-ovsbridge':
346 self.logger.debug("Creating compute OVS bridge")
347 self.create_ovs_bridge()
348 elif task[0] == 'new-vxlan':
349 self.logger.debug("Creating vxlan tunnel='{}', remote ip='{}'".format(task[1], task[2]))
350 self.create_ovs_vxlan_tunnel(task[1], task[2])
351 elif task[0] == 'del-ovsbridge':
352 self.logger.debug("Deleting OVS bridge")
353 self.delete_ovs_bridge()
354 elif task[0] == 'del-vxlan':
355 self.logger.debug("Deleting vxlan {} tunnel".format(task[1]))
356 self.delete_ovs_vxlan_tunnel(task[1])
357 elif task[0] == 'create-ovs-bridge-port':
358 self.logger.debug("Adding port ovim-{} to OVS bridge".format(task[1]))
359 self.create_ovs_bridge_port(task[1])
360 elif task[0] == 'del-ovs-port':
361 self.logger.debug("Delete bridge attached to ovs port vlan {} net {}".format(task[1], task[2]))
362 self.delete_bridge_port_attached_to_ovs(task[1], task[2])
363 else:
364 self.logger.debug("unknown task " + str(task))
365
366 except Exception as e:
367 self.logger.critical("Unexpected exception at run: " + str(e), exc_info=True)
368
369 def server_forceoff(self, wait_until_finished=False):
370 while len(self.pending_terminate_server)>0:
371 now = time.time()
372 if self.pending_terminate_server[0][0]>now:
373 if wait_until_finished:
374 time.sleep(1)
375 continue
376 else:
377 return
378 req={'uuid':self.pending_terminate_server[0][1],
379 'action':{'terminate':'force'},
380 'status': None
381 }
382 self.action_on_server(req)
383 self.pending_terminate_server.pop(0)
384
385 def terminate(self):
386 try:
387 self.server_forceoff(True)
388 if self.localinfo_dirty:
389 self.save_localinfo()
390 if not self.test:
391 self.ssh_conn.close()
392 except Exception as e:
393 text = str(e)
394 self.logger.error("terminate Exception: " + text)
395 self.logger.debug("exit from host_thread")
396
397 def get_local_iface_name(self, generic_name):
398 if self.hostinfo != None and "iface_names" in self.hostinfo and generic_name in self.hostinfo["iface_names"]:
399 return self.hostinfo["iface_names"][generic_name]
400 return generic_name
401
402 def create_xml_server(self, server, dev_list, server_metadata={}):
403 """Function that implements the generation of the VM XML definition.
404 Additional devices are in dev_list list
405 The main disk is upon dev_list[0]"""
406
407 #get if operating system is Windows
408 windows_os = False
409 os_type = server_metadata.get('os_type', None)
410 if os_type == None and 'metadata' in dev_list[0]:
411 os_type = dev_list[0]['metadata'].get('os_type', None)
412 if os_type != None and os_type.lower() == "windows":
413 windows_os = True
414 #get type of hard disk bus
415 bus_ide = True if windows_os else False
416 bus = server_metadata.get('bus', None)
417 if bus == None and 'metadata' in dev_list[0]:
418 bus = dev_list[0]['metadata'].get('bus', None)
419 if bus != None:
420 bus_ide = True if bus=='ide' else False
421
422 self.xml_level = 0
423
424 text = "<domain type='kvm'>"
425 #get topology
426 topo = server_metadata.get('topology', None)
427 if topo == None and 'metadata' in dev_list[0]:
428 topo = dev_list[0]['metadata'].get('topology', None)
429 #name
430 name = server.get('name','') + "_" + server['uuid']
431 name = name[:58] #qemu impose a length limit of 59 chars or not start. Using 58
432 text += self.inc_tab() + "<name>" + name+ "</name>"
433 #uuid
434 text += self.tab() + "<uuid>" + server['uuid'] + "</uuid>"
435
436 numa={}
437 if 'extended' in server and server['extended']!=None and 'numas' in server['extended']:
438 numa = server['extended']['numas'][0]
439 #memory
440 use_huge = False
441 memory = int(numa.get('memory',0))*1024*1024 #in KiB
442 if memory==0:
443 memory = int(server['ram'])*1024;
444 else:
445 if not self.develop_mode:
446 use_huge = True
447 if memory==0:
448 return -1, 'No memory assigned to instance'
449 memory = str(memory)
450 text += self.tab() + "<memory unit='KiB'>" +memory+"</memory>"
451 text += self.tab() + "<currentMemory unit='KiB'>" +memory+ "</currentMemory>"
452 if use_huge:
453 text += self.tab()+'<memoryBacking>'+ \
454 self.inc_tab() + '<hugepages/>'+ \
455 self.dec_tab()+ '</memoryBacking>'
456
457 #cpu
458 use_cpu_pinning=False
459 vcpus = int(server.get("vcpus",0))
460 cpu_pinning = []
461 if 'cores-source' in numa:
462 use_cpu_pinning=True
463 for index in range(0, len(numa['cores-source'])):
464 cpu_pinning.append( [ numa['cores-id'][index], numa['cores-source'][index] ] )
465 vcpus += 1
466 if 'threads-source' in numa:
467 use_cpu_pinning=True
468 for index in range(0, len(numa['threads-source'])):
469 cpu_pinning.append( [ numa['threads-id'][index], numa['threads-source'][index] ] )
470 vcpus += 1
471 if 'paired-threads-source' in numa:
472 use_cpu_pinning=True
473 for index in range(0, len(numa['paired-threads-source'])):
474 cpu_pinning.append( [numa['paired-threads-id'][index][0], numa['paired-threads-source'][index][0] ] )
475 cpu_pinning.append( [numa['paired-threads-id'][index][1], numa['paired-threads-source'][index][1] ] )
476 vcpus += 2
477
478 if use_cpu_pinning and not self.develop_mode:
479 text += self.tab()+"<vcpu placement='static'>" +str(len(cpu_pinning)) +"</vcpu>" + \
480 self.tab()+'<cputune>'
481 self.xml_level += 1
482 for i in range(0, len(cpu_pinning)):
483 text += self.tab() + "<vcpupin vcpu='" +str(cpu_pinning[i][0])+ "' cpuset='" +str(cpu_pinning[i][1]) +"'/>"
484 text += self.dec_tab()+'</cputune>'+ \
485 self.tab() + '<numatune>' +\
486 self.inc_tab() + "<memory mode='strict' nodeset='" +str(numa['source'])+ "'/>" +\
487 self.dec_tab() + '</numatune>'
488 else:
489 if vcpus==0:
490 return -1, "Instance without number of cpus"
491 text += self.tab()+"<vcpu>" + str(vcpus) + "</vcpu>"
492
493 #boot
494 boot_cdrom = False
495 for dev in dev_list:
496 if dev['type']=='cdrom' :
497 boot_cdrom = True
498 break
499 text += self.tab()+ '<os>' + \
500 self.inc_tab() + "<type arch='x86_64' machine='pc'>hvm</type>"
501 if boot_cdrom:
502 text += self.tab() + "<boot dev='cdrom'/>"
503 text += self.tab() + "<boot dev='hd'/>" + \
504 self.dec_tab()+'</os>'
505 #features
506 text += self.tab()+'<features>'+\
507 self.inc_tab()+'<acpi/>' +\
508 self.tab()+'<apic/>' +\
509 self.tab()+'<pae/>'+ \
510 self.dec_tab() +'</features>'
511 if topo == "oneSocket:hyperthreading":
512 if vcpus % 2 != 0:
513 return -1, 'Cannot expose hyperthreading with an odd number of vcpus'
514 text += self.tab() + "<cpu mode='host-model'> <topology sockets='1' cores='%d' threads='2' /> </cpu>" % vcpus/2
515 elif windows_os or topo == "oneSocket":
516 text += self.tab() + "<cpu mode='host-model'> <topology sockets='1' cores='%d' threads='1' /> </cpu>" % vcpus
517 else:
518 text += self.tab() + "<cpu mode='host-model'></cpu>"
519 text += self.tab() + "<clock offset='utc'/>" +\
520 self.tab() + "<on_poweroff>preserve</on_poweroff>" + \
521 self.tab() + "<on_reboot>restart</on_reboot>" + \
522 self.tab() + "<on_crash>restart</on_crash>"
523 text += self.tab() + "<devices>" + \
524 self.inc_tab() + "<emulator>/usr/libexec/qemu-kvm</emulator>" + \
525 self.tab() + "<serial type='pty'>" +\
526 self.inc_tab() + "<target port='0'/>" + \
527 self.dec_tab() + "</serial>" +\
528 self.tab() + "<console type='pty'>" + \
529 self.inc_tab()+ "<target type='serial' port='0'/>" + \
530 self.dec_tab()+'</console>'
531 if windows_os:
532 text += self.tab() + "<controller type='usb' index='0'/>" + \
533 self.tab() + "<controller type='ide' index='0'/>" + \
534 self.tab() + "<input type='mouse' bus='ps2'/>" + \
535 self.tab() + "<sound model='ich6'/>" + \
536 self.tab() + "<video>" + \
537 self.inc_tab() + "<model type='cirrus' vram='9216' heads='1'/>" + \
538 self.dec_tab() + "</video>" + \
539 self.tab() + "<memballoon model='virtio'/>" + \
540 self.tab() + "<input type='tablet' bus='usb'/>" #TODO revisar
541
542 #> self.tab()+'<alias name=\'hostdev0\'/>\n' +\
543 #> self.dec_tab()+'</hostdev>\n' +\
544 #> self.tab()+'<input type=\'tablet\' bus=\'usb\'/>\n'
545 if windows_os:
546 text += self.tab() + "<graphics type='vnc' port='-1' autoport='yes'/>"
547 else:
548 #If image contains 'GRAPH' include graphics
549 #if 'GRAPH' in image:
550 text += self.tab() + "<graphics type='vnc' port='-1' autoport='yes' listen='0.0.0.0'>" +\
551 self.inc_tab() + "<listen type='address' address='0.0.0.0'/>" +\
552 self.dec_tab() + "</graphics>"
553
554 vd_index = 'a'
555 for dev in dev_list:
556 bus_ide_dev = bus_ide
557 if dev['type']=='cdrom' or dev['type']=='disk':
558 if dev['type']=='cdrom':
559 bus_ide_dev = True
560 text += self.tab() + "<disk type='file' device='"+dev['type']+"'>"
561 if 'file format' in dev:
562 text += self.inc_tab() + "<driver name='qemu' type='" +dev['file format']+ "' cache='writethrough'/>"
563 if 'source file' in dev:
564 text += self.tab() + "<source file='" +dev['source file']+ "'/>"
565 #elif v['type'] == 'block':
566 # text += self.tab() + "<source dev='" + v['source'] + "'/>"
567 #else:
568 # return -1, 'Unknown disk type ' + v['type']
569 vpci = dev.get('vpci',None)
570 if vpci == None:
571 vpci = dev['metadata'].get('vpci',None)
572 text += self.pci2xml(vpci)
573
574 if bus_ide_dev:
575 text += self.tab() + "<target dev='hd" +vd_index+ "' bus='ide'/>" #TODO allows several type of disks
576 else:
577 text += self.tab() + "<target dev='vd" +vd_index+ "' bus='virtio'/>"
578 text += self.dec_tab() + '</disk>'
579 vd_index = chr(ord(vd_index)+1)
580 elif dev['type']=='xml':
581 dev_text = dev['xml']
582 if 'vpci' in dev:
583 dev_text = dev_text.replace('__vpci__', dev['vpci'])
584 if 'source file' in dev:
585 dev_text = dev_text.replace('__file__', dev['source file'])
586 if 'file format' in dev:
587 dev_text = dev_text.replace('__format__', dev['source file'])
588 if '__dev__' in dev_text:
589 dev_text = dev_text.replace('__dev__', vd_index)
590 vd_index = chr(ord(vd_index)+1)
591 text += dev_text
592 else:
593 return -1, 'Unknown device type ' + dev['type']
594
595 net_nb=0
596 bridge_interfaces = server.get('networks', [])
597 for v in bridge_interfaces:
598 #Get the brifge name
599 self.db_lock.acquire()
600 result, content = self.db.get_table(FROM='nets', SELECT=('provider',),WHERE={'uuid':v['net_id']} )
601 self.db_lock.release()
602 if result <= 0:
603 self.logger.error("create_xml_server ERROR %d getting nets %s", result, content)
604 return -1, content
605 #ALF: Allow by the moment the 'default' bridge net because is confortable for provide internet to VM
606 #I know it is not secure
607 #for v in sorted(desc['network interfaces'].itervalues()):
608 model = v.get("model", None)
609 if content[0]['provider']=='default':
610 text += self.tab() + "<interface type='network'>" + \
611 self.inc_tab() + "<source network='" +content[0]['provider']+ "'/>"
612 elif content[0]['provider'][0:7]=='macvtap':
613 text += self.tab()+"<interface type='direct'>" + \
614 self.inc_tab() + "<source dev='" + self.get_local_iface_name(content[0]['provider'][8:]) + "' mode='bridge'/>" + \
615 self.tab() + "<target dev='macvtap0'/>"
616 if windows_os:
617 text += self.tab() + "<alias name='net" + str(net_nb) + "'/>"
618 elif model==None:
619 model = "virtio"
620 elif content[0]['provider'][0:6]=='bridge':
621 text += self.tab() + "<interface type='bridge'>" + \
622 self.inc_tab()+"<source bridge='" +self.get_local_iface_name(content[0]['provider'][7:])+ "'/>"
623 if windows_os:
624 text += self.tab() + "<target dev='vnet" + str(net_nb)+ "'/>" +\
625 self.tab() + "<alias name='net" + str(net_nb)+ "'/>"
626 elif model==None:
627 model = "virtio"
628 elif content[0]['provider'][0:3] == "OVS":
629 vlan = content[0]['provider'].replace('OVS:', '')
630 text += self.tab() + "<interface type='bridge'>" + \
631 self.inc_tab() + "<source bridge='ovim-" + str(vlan) + "'/>"
632 else:
633 return -1, 'Unknown Bridge net provider ' + content[0]['provider']
634 if model!=None:
635 text += self.tab() + "<model type='" +model+ "'/>"
636 if v.get('mac_address', None) != None:
637 text+= self.tab() +"<mac address='" +v['mac_address']+ "'/>"
638 text += self.pci2xml(v.get('vpci',None))
639 text += self.dec_tab()+'</interface>'
640
641 net_nb += 1
642
643 interfaces = numa.get('interfaces', [])
644
645 net_nb=0
646 for v in interfaces:
647 if self.develop_mode: #map these interfaces to bridges
648 text += self.tab() + "<interface type='bridge'>" + \
649 self.inc_tab()+"<source bridge='" +self.develop_bridge_iface+ "'/>"
650 if windows_os:
651 text += self.tab() + "<target dev='vnet" + str(net_nb)+ "'/>" +\
652 self.tab() + "<alias name='net" + str(net_nb)+ "'/>"
653 else:
654 text += self.tab() + "<model type='e1000'/>" #e1000 is more probable to be supported than 'virtio'
655 if v.get('mac_address', None) != None:
656 text+= self.tab() +"<mac address='" +v['mac_address']+ "'/>"
657 text += self.pci2xml(v.get('vpci',None))
658 text += self.dec_tab()+'</interface>'
659 continue
660
661 if v['dedicated'] == 'yes': #passthrought
662 text += self.tab() + "<hostdev mode='subsystem' type='pci' managed='yes'>" + \
663 self.inc_tab() + "<source>"
664 self.inc_tab()
665 text += self.pci2xml(v['source'])
666 text += self.dec_tab()+'</source>'
667 text += self.pci2xml(v.get('vpci',None))
668 if windows_os:
669 text += self.tab() + "<alias name='hostdev" + str(net_nb) + "'/>"
670 text += self.dec_tab()+'</hostdev>'
671 net_nb += 1
672 else: #sriov_interfaces
673 #skip not connected interfaces
674 if v.get("net_id") == None:
675 continue
676 text += self.tab() + "<interface type='hostdev' managed='yes'>"
677 self.inc_tab()
678 if v.get('mac_address', None) != None:
679 text+= self.tab() + "<mac address='" +v['mac_address']+ "'/>"
680 text+= self.tab()+'<source>'
681 self.inc_tab()
682 text += self.pci2xml(v['source'])
683 text += self.dec_tab()+'</source>'
684 if v.get('vlan',None) != None:
685 text += self.tab() + "<vlan> <tag id='" + str(v['vlan']) + "'/> </vlan>"
686 text += self.pci2xml(v.get('vpci',None))
687 if windows_os:
688 text += self.tab() + "<alias name='hostdev" + str(net_nb) + "'/>"
689 text += self.dec_tab()+'</interface>'
690
691
692 text += self.dec_tab()+'</devices>'+\
693 self.dec_tab()+'</domain>'
694 return 0, text
695
696 def pci2xml(self, pci):
697 '''from a pci format text XXXX:XX:XX.X generates the xml content of <address>
698 alows an empty pci text'''
699 if pci is None:
700 return ""
701 first_part = pci.split(':')
702 second_part = first_part[2].split('.')
703 return self.tab() + "<address type='pci' domain='0x" + first_part[0] + \
704 "' bus='0x" + first_part[1] + "' slot='0x" + second_part[0] + \
705 "' function='0x" + second_part[1] + "'/>"
706
707 def tab(self):
708 """Return indentation according to xml_level"""
709 return "\n" + (' '*self.xml_level)
710
711 def inc_tab(self):
712 """Increment and return indentation according to xml_level"""
713 self.xml_level += 1
714 return self.tab()
715
716 def dec_tab(self):
717 """Decrement and return indentation according to xml_level"""
718 self.xml_level -= 1
719 return self.tab()
720
721 def create_ovs_bridge(self):
722 """
723 Create a bridge in compute OVS to allocate VMs
724 :return: True if success
725 """
726 if self.test:
727 return True
728 try:
729 command = 'sudo ovs-vsctl --may-exist add-br br-int -- set Bridge br-int stp_enable=true'
730 self.logger.debug("command: " + command)
731 (_, stdout, _) = self.ssh_conn.exec_command(command)
732 content = stdout.read()
733 if len(content) == 0:
734 return True
735 else:
736 return False
737 except paramiko.ssh_exception.SSHException as e:
738 self.logger.error("create_ovs_bridge ssh Exception: " + str(e))
739 if "SSH session not active" in str(e):
740 self.ssh_connect()
741 return False
742
743 def delete_port_to_ovs_bridge(self, vlan, net_uuid):
744 """
745 Delete linux bridge port attched to a OVS bridge, if port is not free the port is not removed
746 :param vlan: vlan port id
747 :param net_uuid: network id
748 :return:
749 """
750
751 if self.test:
752 return True
753 try:
754 port_name = 'ovim-' + str(vlan)
755 command = 'sudo ovs-vsctl del-port br-int ' + port_name
756 self.logger.debug("command: " + command)
757 (_, stdout, _) = self.ssh_conn.exec_command(command)
758 content = stdout.read()
759 if len(content) == 0:
760 return True
761 else:
762 return False
763 except paramiko.ssh_exception.SSHException as e:
764 self.logger.error("delete_port_to_ovs_bridge ssh Exception: " + str(e))
765 if "SSH session not active" in str(e):
766 self.ssh_connect()
767 return False
768
769 def delete_dhcp_server(self, vlan, net_uuid, dhcp_path):
770 """
771 Delete dhcp server process lining in namespace
772 :param vlan: segmentation id
773 :param net_uuid: network uuid
774 :param dhcp_path: conf fiel path that live in namespace side
775 :return:
776 """
777 if self.test:
778 return True
779 if not self.is_dhcp_port_free(vlan, net_uuid):
780 return True
781 try:
782 net_namespace = 'ovim-' + str(vlan)
783 dhcp_path = os.path.join(dhcp_path, net_namespace)
784 pid_file = os.path.join(dhcp_path, 'dnsmasq.pid')
785
786 command = 'sudo ip netns exec ' + net_namespace + ' cat ' + pid_file
787 self.logger.debug("command: " + command)
788 (_, stdout, _) = self.ssh_conn.exec_command(command)
789 content = stdout.read()
790
791 command = 'sudo ip netns exec ' + net_namespace + ' kill -9 ' + content
792 self.logger.debug("command: " + command)
793 (_, stdout, _) = self.ssh_conn.exec_command(command)
794 content = stdout.read()
795
796 # if len(content) == 0:
797 # return True
798 # else:
799 # return False
800 except paramiko.ssh_exception.SSHException as e:
801 self.logger.error("delete_dhcp_server ssh Exception: " + str(e))
802 if "SSH session not active" in str(e):
803 self.ssh_connect()
804 return False
805
806 def is_dhcp_port_free(self, host_id, net_uuid):
807 """
808 Check if any port attached to the a net in a vxlan mesh across computes nodes
809 :param host_id: host id
810 :param net_uuid: network id
811 :return: True if is not free
812 """
813 self.db_lock.acquire()
814 result, content = self.db.get_table(
815 FROM='ports',
816 WHERE={'p.type': 'instance:ovs', 'p.net_id': net_uuid}
817 )
818 self.db_lock.release()
819
820 if len(content) > 0:
821 return False
822 else:
823 return True
824
825 def is_port_free(self, host_id, net_uuid):
826 """
827 Check if there not ovs ports of a network in a compute host.
828 :param host_id: host id
829 :param net_uuid: network id
830 :return: True if is not free
831 """
832
833 self.db_lock.acquire()
834 result, content = self.db.get_table(
835 FROM='ports as p join instances as i on p.instance_id=i.uuid',
836 WHERE={"i.host_id": self.host_id, 'p.type': 'instance:ovs', 'p.net_id': net_uuid}
837 )
838 self.db_lock.release()
839
840 if len(content) > 0:
841 return False
842 else:
843 return True
844
845 def add_port_to_ovs_bridge(self, vlan):
846 """
847 Add a bridge linux as a port to a OVS bridge and set a vlan for an specific linux bridge
848 :param vlan: vlan port id
849 :return: True if success
850 """
851
852 if self.test:
853 return True
854 try:
855 port_name = 'ovim-' + str(vlan)
856 command = 'sudo ovs-vsctl add-port br-int ' + port_name + ' tag=' + str(vlan)
857 self.logger.debug("command: " + command)
858 (_, stdout, _) = self.ssh_conn.exec_command(command)
859 content = stdout.read()
860 if len(content) == 0:
861 return True
862 else:
863 return False
864 except paramiko.ssh_exception.SSHException as e:
865 self.logger.error("add_port_to_ovs_bridge ssh Exception: " + str(e))
866 if "SSH session not active" in str(e):
867 self.ssh_connect()
868 return False
869
870 def delete_dhcp_port(self, vlan, net_uuid):
871 """
872 Delete from an existing OVS bridge a linux bridge port attached and the linux bridge itself.
873 :param vlan: segmentation id
874 :param net_uuid: network id
875 :return: True if success
876 """
877
878 if self.test:
879 return True
880
881 if not self.is_dhcp_port_free(vlan, net_uuid):
882 return True
883 self.delete_dhcp_interfaces(vlan)
884 return True
885
886 def delete_bridge_port_attached_to_ovs(self, vlan, net_uuid):
887 """
888 Delete from an existing OVS bridge a linux bridge port attached and the linux bridge itself.
889 :param vlan:
890 :param net_uuid:
891 :return: True if success
892 """
893 if self.test:
894 return
895
896 if not self.is_port_free(vlan, net_uuid):
897 return True
898 self.delete_port_to_ovs_bridge(vlan, net_uuid)
899 self.delete_linux_bridge(vlan)
900 return True
901
902 def delete_linux_bridge(self, vlan):
903 """
904 Delete a linux bridge in a scpecific compute.
905 :param vlan: vlan port id
906 :return: True if success
907 """
908
909 if self.test:
910 return True
911 try:
912 port_name = 'ovim-' + str(vlan)
913 command = 'sudo ip link set dev veth0-' + str(vlan) + ' down'
914 self.logger.debug("command: " + command)
915 (_, stdout, _) = self.ssh_conn.exec_command(command)
916 # content = stdout.read()
917 #
918 # if len(content) != 0:
919 # return False
920 command = 'sudo ifconfig ' + port_name + ' down && sudo brctl delbr ' + port_name
921 self.logger.debug("command: " + command)
922 (_, stdout, _) = self.ssh_conn.exec_command(command)
923 content = stdout.read()
924 if len(content) == 0:
925 return True
926 else:
927 return False
928 except paramiko.ssh_exception.SSHException as e:
929 self.logger.error("delete_linux_bridge ssh Exception: " + str(e))
930 if "SSH session not active" in str(e):
931 self.ssh_connect()
932 return False
933
934 def create_ovs_bridge_port(self, vlan):
935 """
936 Generate a linux bridge and attache the port to a OVS bridge
937 :param vlan: vlan port id
938 :return:
939 """
940 if self.test:
941 return
942 self.create_linux_bridge(vlan)
943 self.add_port_to_ovs_bridge(vlan)
944
945 def create_linux_bridge(self, vlan):
946 """
947 Create a linux bridge with STP active
948 :param vlan: netowrk vlan id
949 :return:
950 """
951
952 if self.test:
953 return True
954 try:
955 port_name = 'ovim-' + str(vlan)
956 command = 'sudo brctl show | grep ' + port_name
957 self.logger.debug("command: " + command)
958 (_, stdout, _) = self.ssh_conn.exec_command(command)
959 content = stdout.read()
960
961 # if exist nothing to create
962 # if len(content) == 0:
963 # return False
964
965 command = 'sudo brctl addbr ' + port_name
966 self.logger.debug("command: " + command)
967 (_, stdout, _) = self.ssh_conn.exec_command(command)
968 content = stdout.read()
969
970 # if len(content) == 0:
971 # return True
972 # else:
973 # return False
974
975 command = 'sudo brctl stp ' + port_name + ' on'
976 self.logger.debug("command: " + command)
977 (_, stdout, _) = self.ssh_conn.exec_command(command)
978 content = stdout.read()
979
980 # if len(content) == 0:
981 # return True
982 # else:
983 # return False
984 command = 'sudo ip link set dev ' + port_name + ' up'
985 self.logger.debug("command: " + command)
986 (_, stdout, _) = self.ssh_conn.exec_command(command)
987 content = stdout.read()
988
989 if len(content) == 0:
990 return True
991 else:
992 return False
993 except paramiko.ssh_exception.SSHException as e:
994 self.logger.error("create_linux_bridge ssh Exception: " + str(e))
995 if "SSH session not active" in str(e):
996 self.ssh_connect()
997 return False
998
999 def set_mac_dhcp_server(self, ip, mac, vlan, netmask, dhcp_path):
1000 """
1001 Write into dhcp conf file a rule to assigned a fixed ip given to an specific MAC address
1002 :param ip: IP address asigned to a VM
1003 :param mac: VM vnic mac to be macthed with the IP received
1004 :param vlan: Segmentation id
1005 :param netmask: netmask value
1006 :param path: dhcp conf file path that live in namespace side
1007 :return: True if success
1008 """
1009
1010 if self.test:
1011 return True
1012
1013 net_namespace = 'ovim-' + str(vlan)
1014 dhcp_path = os.path.join(dhcp_path, net_namespace)
1015 dhcp_hostsdir = os.path.join(dhcp_path, net_namespace)
1016
1017 if not ip:
1018 return False
1019 try:
1020 ip_data = mac.upper() + ',' + ip
1021
1022 command = 'sudo ip netns exec ' + net_namespace + ' touch ' + dhcp_hostsdir
1023 self.logger.debug("command: " + command)
1024 (_, stdout, _) = self.ssh_conn.exec_command(command)
1025 content = stdout.read()
1026
1027 command = 'sudo ip netns exec ' + net_namespace + ' sudo bash -ec "echo ' + ip_data + ' >> ' + dhcp_hostsdir + '"'
1028
1029 self.logger.debug("command: " + command)
1030 (_, stdout, _) = self.ssh_conn.exec_command(command)
1031 content = stdout.read()
1032
1033 if len(content) == 0:
1034 return True
1035 else:
1036 return False
1037 except paramiko.ssh_exception.SSHException as e:
1038 self.logger.error("set_mac_dhcp_server ssh Exception: " + str(e))
1039 if "SSH session not active" in str(e):
1040 self.ssh_connect()
1041 return False
1042
1043 def delete_mac_dhcp_server(self, ip, mac, vlan, dhcp_path):
1044 """
1045 Delete into dhcp conf file the ip assigned to a specific MAC address
1046
1047 :param ip: IP address asigned to a VM
1048 :param mac: VM vnic mac to be macthed with the IP received
1049 :param vlan: Segmentation id
1050 :param dhcp_path: dhcp conf file path that live in namespace side
1051 :return:
1052 """
1053
1054 if self.test:
1055 return False
1056 try:
1057 net_namespace = 'ovim-' + str(vlan)
1058 dhcp_path = os.path.join(dhcp_path, net_namespace)
1059 dhcp_hostsdir = os.path.join(dhcp_path, net_namespace)
1060
1061 if not ip:
1062 return False
1063
1064 ip_data = mac.upper() + ',' + ip
1065
1066 command = 'sudo ip netns exec ' + net_namespace + ' sudo sed -i \'/' + ip_data + '/d\' ' + dhcp_hostsdir
1067 self.logger.debug("command: " + command)
1068 (_, stdout, _) = self.ssh_conn.exec_command(command)
1069 content = stdout.read()
1070
1071 if len(content) == 0:
1072 return True
1073 else:
1074 return False
1075
1076 except paramiko.ssh_exception.SSHException as e:
1077 self.logger.error("set_mac_dhcp_server ssh Exception: " + str(e))
1078 if "SSH session not active" in str(e):
1079 self.ssh_connect()
1080 return False
1081
1082 def launch_dhcp_server(self, vlan, ip_range, netmask, dhcp_path, gateway):
1083 """
1084 Generate a linux bridge and attache the port to a OVS bridge
1085 :param self:
1086 :param vlan: Segmentation id
1087 :param ip_range: IP dhcp range
1088 :param netmask: network netmask
1089 :param dhcp_path: dhcp conf file path that live in namespace side
1090 :param gateway: Gateway address for dhcp net
1091 :return: True if success
1092 """
1093
1094 if self.test:
1095 return True
1096 try:
1097 interface = 'tap-' + str(vlan)
1098 net_namespace = 'ovim-' + str(vlan)
1099 dhcp_path = os.path.join(dhcp_path, net_namespace)
1100 leases_path = os.path.join(dhcp_path, "dnsmasq.leases")
1101 pid_file = os.path.join(dhcp_path, 'dnsmasq.pid')
1102
1103 dhcp_range = ip_range[0] + ',' + ip_range[1] + ',' + netmask
1104
1105 command = 'sudo ip netns exec ' + net_namespace + ' mkdir -p ' + dhcp_path
1106 self.logger.debug("command: " + command)
1107 (_, stdout, _) = self.ssh_conn.exec_command(command)
1108 content = stdout.read()
1109
1110 pid_path = os.path.join(dhcp_path, 'dnsmasq.pid')
1111 command = 'sudo ip netns exec ' + net_namespace + ' cat ' + pid_path
1112 self.logger.debug("command: " + command)
1113 (_, stdout, _) = self.ssh_conn.exec_command(command)
1114 content = stdout.read()
1115 # check if pid is runing
1116 pid_status_path = content
1117 if content:
1118 command = "ps aux | awk '{print $2 }' | grep " + pid_status_path
1119 self.logger.debug("command: " + command)
1120 (_, stdout, _) = self.ssh_conn.exec_command(command)
1121 content = stdout.read()
1122 if not content:
1123 command = 'sudo ip netns exec ' + net_namespace + ' /usr/sbin/dnsmasq --strict-order --except-interface=lo ' \
1124 '--interface=' + interface + ' --bind-interfaces --dhcp-hostsdir=' + dhcp_path + \
1125 ' --dhcp-range ' + dhcp_range + ' --pid-file=' + pid_file + ' --dhcp-leasefile=' + leases_path + \
1126 ' --listen-address ' + gateway
1127
1128 self.logger.debug("command: " + command)
1129 (_, stdout, _) = self.ssh_conn.exec_command(command)
1130 content = stdout.readline()
1131
1132 if len(content) == 0:
1133 return True
1134 else:
1135 return False
1136 except paramiko.ssh_exception.SSHException as e:
1137 self.logger.error("launch_dhcp_server ssh Exception: " + str(e))
1138 if "SSH session not active" in str(e):
1139 self.ssh_connect()
1140 return False
1141
1142 def delete_dhcp_interfaces(self, vlan):
1143 """
1144 Create a linux bridge with STP active
1145 :param vlan: netowrk vlan id
1146 :return:
1147 """
1148
1149 if self.test:
1150 return True
1151 try:
1152 net_namespace = 'ovim-' + str(vlan)
1153 command = 'sudo ovs-vsctl del-port br-int ovs-tap-' + str(vlan)
1154 self.logger.debug("command: " + command)
1155 (_, stdout, _) = self.ssh_conn.exec_command(command)
1156 content = stdout.read()
1157
1158 command = 'sudo ip netns exec ' + net_namespace + ' ip link set dev tap-' + str(vlan) + ' down'
1159 self.logger.debug("command: " + command)
1160 (_, stdout, _) = self.ssh_conn.exec_command(command)
1161 content = stdout.read()
1162
1163 command = 'sudo ip link set dev ovs-tap-' + str(vlan) + ' down'
1164 self.logger.debug("command: " + command)
1165 (_, stdout, _) = self.ssh_conn.exec_command(command)
1166 content = stdout.read()
1167 except paramiko.ssh_exception.SSHException as e:
1168 self.logger.error("delete_dhcp_interfaces ssh Exception: " + str(e))
1169 if "SSH session not active" in str(e):
1170 self.ssh_connect()
1171 return False
1172
1173 def create_dhcp_interfaces(self, vlan, ip_listen_address, netmask):
1174 """
1175 Create a linux bridge with STP active
1176 :param vlan: segmentation id
1177 :param ip_listen_address: Listen Ip address for the dhcp service, the tap interface living in namesapce side
1178 :param netmask: dhcp net CIDR
1179 :return: True if success
1180 """
1181
1182 if self.test:
1183 return True
1184 try:
1185 net_namespace = 'ovim-' + str(vlan)
1186 namespace_interface = 'tap-' + str(vlan)
1187
1188 command = 'sudo ip netns add ' + net_namespace
1189 self.logger.debug("command: " + command)
1190 (_, stdout, _) = self.ssh_conn.exec_command(command)
1191 content = stdout.read()
1192
1193 command = 'sudo ip link add tap-' + str(vlan) + ' type veth peer name ovs-tap-' + str(vlan)
1194 self.logger.debug("command: " + command)
1195 (_, stdout, _) = self.ssh_conn.exec_command(command)
1196 content = stdout.read()
1197
1198 command = 'sudo ovs-vsctl add-port br-int ovs-tap-' + str(vlan) + ' tag=' + str(vlan)
1199 self.logger.debug("command: " + command)
1200 (_, stdout, _) = self.ssh_conn.exec_command(command)
1201 content = stdout.read()
1202
1203 command = 'sudo ip link set tap-' + str(vlan) + ' netns ' + net_namespace
1204 self.logger.debug("command: " + command)
1205 (_, stdout, _) = self.ssh_conn.exec_command(command)
1206 content = stdout.read()
1207
1208 command = 'sudo ip netns exec ' + net_namespace + ' ip link set dev tap-' + str(vlan) + ' up'
1209 self.logger.debug("command: " + command)
1210 (_, stdout, _) = self.ssh_conn.exec_command(command)
1211 content = stdout.read()
1212
1213 command = 'sudo ip link set dev ovs-tap-' + str(vlan) + ' up'
1214 self.logger.debug("command: " + command)
1215 (_, stdout, _) = self.ssh_conn.exec_command(command)
1216 content = stdout.read()
1217
1218 command = 'sudo ip netns exec ' + net_namespace + ' ' + ' ifconfig ' + namespace_interface \
1219 + ' ' + ip_listen_address + ' netmask ' + netmask
1220 self.logger.debug("command: " + command)
1221 (_, stdout, _) = self.ssh_conn.exec_command(command)
1222 content = stdout.read()
1223
1224 if len(content) == 0:
1225 return True
1226 else:
1227 return False
1228 except paramiko.ssh_exception.SSHException as e:
1229 self.logger.error("create_dhcp_interfaces ssh Exception: " + str(e))
1230 if "SSH session not active" in str(e):
1231 self.ssh_connect()
1232 return False
1233
1234
1235 def create_ovs_vxlan_tunnel(self, vxlan_interface, remote_ip):
1236 """
1237 Create a vlxn tunnel between to computes with an OVS installed. STP is also active at port level
1238 :param vxlan_interface: vlxan inteface name.
1239 :param remote_ip: tunnel endpoint remote compute ip.
1240 :return:
1241 """
1242 if self.test:
1243 return True
1244 try:
1245 command = 'sudo ovs-vsctl add-port br-int ' + vxlan_interface + \
1246 ' -- set Interface ' + vxlan_interface + ' type=vxlan options:remote_ip=' + remote_ip + \
1247 ' -- set Port ' + vxlan_interface + ' other_config:stp-path-cost=10'
1248 self.logger.debug("command: " + command)
1249 (_, stdout, _) = self.ssh_conn.exec_command(command)
1250 content = stdout.read()
1251 # print content
1252 if len(content) == 0:
1253 return True
1254 else:
1255 return False
1256 except paramiko.ssh_exception.SSHException as e:
1257 self.logger.error("create_ovs_vxlan_tunnel ssh Exception: " + str(e))
1258 if "SSH session not active" in str(e):
1259 self.ssh_connect()
1260 return False
1261
1262 def delete_ovs_vxlan_tunnel(self, vxlan_interface):
1263 """
1264 Delete a vlxan tunnel port from a OVS brdige.
1265 :param vxlan_interface: vlxan name to be delete it.
1266 :return: True if success.
1267 """
1268 if self.test:
1269 return True
1270 try:
1271 command = 'sudo ovs-vsctl del-port br-int ' + vxlan_interface
1272 self.logger.debug("command: " + command)
1273 (_, stdout, _) = self.ssh_conn.exec_command(command)
1274 content = stdout.read()
1275 # print content
1276 if len(content) == 0:
1277 return True
1278 else:
1279 return False
1280 except paramiko.ssh_exception.SSHException as e:
1281 self.logger.error("delete_ovs_vxlan_tunnel ssh Exception: " + str(e))
1282 if "SSH session not active" in str(e):
1283 self.ssh_connect()
1284 return False
1285
1286 def delete_ovs_bridge(self):
1287 """
1288 Delete a OVS bridge from a compute.
1289 :return: True if success
1290 """
1291 if self.test:
1292 return True
1293 try:
1294 command = 'sudo ovs-vsctl del-br br-int'
1295 self.logger.debug("command: " + command)
1296 (_, stdout, _) = self.ssh_conn.exec_command(command)
1297 content = stdout.read()
1298 if len(content) == 0:
1299 return True
1300 else:
1301 return False
1302 except paramiko.ssh_exception.SSHException as e:
1303 self.logger.error("delete_ovs_bridge ssh Exception: " + str(e))
1304 if "SSH session not active" in str(e):
1305 self.ssh_connect()
1306 return False
1307
1308 def get_file_info(self, path):
1309 command = 'ls -lL --time-style=+%Y-%m-%dT%H:%M:%S ' + path
1310 self.logger.debug("command: " + command)
1311 (_, stdout, _) = self.ssh_conn.exec_command(command)
1312 content = stdout.read()
1313 if len(content) == 0:
1314 return None # file does not exist
1315 else:
1316 return content.split(" ") # (permission, 1, owner, group, size, date, file)
1317
1318 def qemu_get_info(self, path):
1319 command = 'qemu-img info ' + path
1320 self.logger.debug("command: " + command)
1321 (_, stdout, stderr) = self.ssh_conn.exec_command(command)
1322 content = stdout.read()
1323 if len(content) == 0:
1324 error = stderr.read()
1325 self.logger.error("get_qemu_info error " + error)
1326 raise paramiko.ssh_exception.SSHException("Error getting qemu_info: " + error)
1327 else:
1328 try:
1329 return yaml.load(content)
1330 except yaml.YAMLError as exc:
1331 text = ""
1332 if hasattr(exc, 'problem_mark'):
1333 mark = exc.problem_mark
1334 text = " at position: (%s:%s)" % (mark.line+1, mark.column+1)
1335 self.logger.error("get_qemu_info yaml format Exception " + text)
1336 raise paramiko.ssh_exception.SSHException("Error getting qemu_info yaml format" + text)
1337
1338 def qemu_change_backing(self, inc_file, new_backing_file):
1339 command = 'qemu-img rebase -u -b ' + new_backing_file + ' ' + inc_file
1340 self.logger.debug("command: " + command)
1341 (_, _, stderr) = self.ssh_conn.exec_command(command)
1342 content = stderr.read()
1343 if len(content) == 0:
1344 return 0
1345 else:
1346 self.logger.error("qemu_change_backing error: " + content)
1347 return -1
1348
1349 def get_notused_filename(self, proposed_name, suffix=''):
1350 '''Look for a non existing file_name in the host
1351 proposed_name: proposed file name, includes path
1352 suffix: suffix to be added to the name, before the extention
1353 '''
1354 extension = proposed_name.rfind(".")
1355 slash = proposed_name.rfind("/")
1356 if extension < 0 or extension < slash: # no extension
1357 extension = len(proposed_name)
1358 target_name = proposed_name[:extension] + suffix + proposed_name[extension:]
1359 info = self.get_file_info(target_name)
1360 if info is None:
1361 return target_name
1362
1363 index=0
1364 while info is not None:
1365 target_name = proposed_name[:extension] + suffix + "-" + str(index) + proposed_name[extension:]
1366 index+=1
1367 info = self.get_file_info(target_name)
1368 return target_name
1369
1370 def get_notused_path(self, proposed_path, suffix=''):
1371 '''Look for a non existing path at database for images
1372 proposed_path: proposed file name, includes path
1373 suffix: suffix to be added to the name, before the extention
1374 '''
1375 extension = proposed_path.rfind(".")
1376 if extension < 0:
1377 extension = len(proposed_path)
1378 if suffix != None:
1379 target_path = proposed_path[:extension] + suffix + proposed_path[extension:]
1380 index=0
1381 while True:
1382 r,_=self.db.get_table(FROM="images",WHERE={"path":target_path})
1383 if r<=0:
1384 return target_path
1385 target_path = proposed_path[:extension] + suffix + "-" + str(index) + proposed_path[extension:]
1386 index+=1
1387
1388
1389 def delete_file(self, file_name):
1390 command = 'rm -f '+file_name
1391 self.logger.debug("command: " + command)
1392 (_, _, stderr) = self.ssh_conn.exec_command(command)
1393 error_msg = stderr.read()
1394 if len(error_msg) > 0:
1395 raise paramiko.ssh_exception.SSHException("Error deleting file: " + error_msg)
1396
1397 def copy_file(self, source, destination, perserve_time=True):
1398 if source[0:4]=="http":
1399 command = "wget --no-verbose -O '{dst}' '{src}' 2>'{dst_result}' || cat '{dst_result}' >&2 && rm '{dst_result}'".format(
1400 dst=destination, src=source, dst_result=destination + ".result" )
1401 else:
1402 command = 'cp --no-preserve=mode'
1403 if perserve_time:
1404 command += ' --preserve=timestamps'
1405 command += " '{}' '{}'".format(source, destination)
1406 self.logger.debug("command: " + command)
1407 (_, _, stderr) = self.ssh_conn.exec_command(command)
1408 error_msg = stderr.read()
1409 if len(error_msg) > 0:
1410 raise paramiko.ssh_exception.SSHException("Error copying image to local host: " + error_msg)
1411
1412 def copy_remote_file(self, remote_file, use_incremental):
1413 ''' Copy a file from the repository to local folder and recursively
1414 copy the backing files in case the remote file is incremental
1415 Read and/or modified self.localinfo['files'] that contain the
1416 unmodified copies of images in the local path
1417 params:
1418 remote_file: path of remote file
1419 use_incremental: None (leave the decision to this function), True, False
1420 return:
1421 local_file: name of local file
1422 qemu_info: dict with quemu information of local file
1423 use_incremental_out: True, False; same as use_incremental, but if None a decision is taken
1424 '''
1425
1426 use_incremental_out = use_incremental
1427 new_backing_file = None
1428 local_file = None
1429 file_from_local = True
1430
1431 #in case incremental use is not decided, take the decision depending on the image
1432 #avoid the use of incremental if this image is already incremental
1433 if remote_file[0:4] == "http":
1434 file_from_local = False
1435 if file_from_local:
1436 qemu_remote_info = self.qemu_get_info(remote_file)
1437 if use_incremental_out==None:
1438 use_incremental_out = not ( file_from_local and 'backing file' in qemu_remote_info)
1439 #copy recursivelly the backing files
1440 if file_from_local and 'backing file' in qemu_remote_info:
1441 new_backing_file, _, _ = self.copy_remote_file(qemu_remote_info['backing file'], True)
1442
1443 #check if remote file is present locally
1444 if use_incremental_out and remote_file in self.localinfo['files']:
1445 local_file = self.localinfo['files'][remote_file]
1446 local_file_info = self.get_file_info(local_file)
1447 if file_from_local:
1448 remote_file_info = self.get_file_info(remote_file)
1449 if local_file_info == None:
1450 local_file = None
1451 elif file_from_local and (local_file_info[4]!=remote_file_info[4] or local_file_info[5]!=remote_file_info[5]):
1452 #local copy of file not valid because date or size are different.
1453 #TODO DELETE local file if this file is not used by any active virtual machine
1454 try:
1455 self.delete_file(local_file)
1456 del self.localinfo['files'][remote_file]
1457 except Exception:
1458 pass
1459 local_file = None
1460 else: #check that the local file has the same backing file, or there are not backing at all
1461 qemu_info = self.qemu_get_info(local_file)
1462 if new_backing_file != qemu_info.get('backing file'):
1463 local_file = None
1464
1465
1466 if local_file == None: #copy the file
1467 img_name= remote_file.split('/') [-1]
1468 img_local = self.image_path + '/' + img_name
1469 local_file = self.get_notused_filename(img_local)
1470 self.copy_file(remote_file, local_file, use_incremental_out)
1471
1472 if use_incremental_out:
1473 self.localinfo['files'][remote_file] = local_file
1474 if new_backing_file:
1475 self.qemu_change_backing(local_file, new_backing_file)
1476 qemu_info = self.qemu_get_info(local_file)
1477
1478 return local_file, qemu_info, use_incremental_out
1479
1480 def launch_server(self, conn, server, rebuild=False, domain=None):
1481 if self.test:
1482 time.sleep(random.randint(20,150)) #sleep random timeto be make it a bit more real
1483 return 0, 'Success'
1484
1485 server_id = server['uuid']
1486 paused = server.get('paused','no')
1487 try:
1488 if domain!=None and rebuild==False:
1489 domain.resume()
1490 #self.server_status[server_id] = 'ACTIVE'
1491 return 0, 'Success'
1492
1493 self.db_lock.acquire()
1494 result, server_data = self.db.get_instance(server_id)
1495 self.db_lock.release()
1496 if result <= 0:
1497 self.logger.error("launch_server ERROR getting server from DB %d %s", result, server_data)
1498 return result, server_data
1499
1500 #0: get image metadata
1501 server_metadata = server.get('metadata', {})
1502 use_incremental = None
1503
1504 if "use_incremental" in server_metadata:
1505 use_incremental = False if server_metadata["use_incremental"] == "no" else True
1506
1507 server_host_files = self.localinfo['server_files'].get( server['uuid'], {})
1508 if rebuild:
1509 #delete previous incremental files
1510 for file_ in server_host_files.values():
1511 self.delete_file(file_['source file'] )
1512 server_host_files={}
1513
1514 #1: obtain aditional devices (disks)
1515 #Put as first device the main disk
1516 devices = [ {"type":"disk", "image_id":server['image_id'], "vpci":server_metadata.get('vpci', None) } ]
1517 if 'extended' in server_data and server_data['extended']!=None and "devices" in server_data['extended']:
1518 devices += server_data['extended']['devices']
1519
1520 for dev in devices:
1521 if dev['image_id'] == None:
1522 continue
1523
1524 self.db_lock.acquire()
1525 result, content = self.db.get_table(FROM='images', SELECT=('path', 'metadata'),
1526 WHERE={'uuid': dev['image_id']})
1527 self.db_lock.release()
1528 if result <= 0:
1529 error_text = "ERROR", result, content, "when getting image", dev['image_id']
1530 self.logger.error("launch_server " + error_text)
1531 return -1, error_text
1532 if content[0]['metadata'] is not None:
1533 dev['metadata'] = json.loads(content[0]['metadata'])
1534 else:
1535 dev['metadata'] = {}
1536
1537 if dev['image_id'] in server_host_files:
1538 dev['source file'] = server_host_files[ dev['image_id'] ] ['source file'] #local path
1539 dev['file format'] = server_host_files[ dev['image_id'] ] ['file format'] # raw or qcow2
1540 continue
1541
1542 #2: copy image to host
1543 remote_file = content[0]['path']
1544 use_incremental_image = use_incremental
1545 if dev['metadata'].get("use_incremental") == "no":
1546 use_incremental_image = False
1547 local_file, qemu_info, use_incremental_image = self.copy_remote_file(remote_file, use_incremental_image)
1548
1549 #create incremental image
1550 if use_incremental_image:
1551 local_file_inc = self.get_notused_filename(local_file, '.inc')
1552 command = 'qemu-img create -f qcow2 '+local_file_inc+ ' -o backing_file='+ local_file
1553 self.logger.debug("command: " + command)
1554 (_, _, stderr) = self.ssh_conn.exec_command(command)
1555 error_msg = stderr.read()
1556 if len(error_msg) > 0:
1557 raise paramiko.ssh_exception.SSHException("Error creating incremental file: " + error_msg)
1558 local_file = local_file_inc
1559 qemu_info = {'file format':'qcow2'}
1560
1561 server_host_files[ dev['image_id'] ] = {'source file': local_file, 'file format': qemu_info['file format']}
1562
1563 dev['source file'] = local_file
1564 dev['file format'] = qemu_info['file format']
1565
1566 self.localinfo['server_files'][ server['uuid'] ] = server_host_files
1567 self.localinfo_dirty = True
1568
1569 #3 Create XML
1570 result, xml = self.create_xml_server(server_data, devices, server_metadata) #local_file
1571 if result <0:
1572 self.logger.error("create xml server error: " + xml)
1573 return -2, xml
1574 self.logger.debug("create xml: " + xml)
1575 atribute = host_thread.lvirt_module.VIR_DOMAIN_START_PAUSED if paused == "yes" else 0
1576 #4 Start the domain
1577 if not rebuild: #ensures that any pending destroying server is done
1578 self.server_forceoff(True)
1579 #self.logger.debug("launching instance " + xml)
1580 conn.createXML(xml, atribute)
1581 #self.server_status[server_id] = 'PAUSED' if paused == "yes" else 'ACTIVE'
1582
1583 return 0, 'Success'
1584
1585 except paramiko.ssh_exception.SSHException as e:
1586 text = e.args[0]
1587 self.logger.error("launch_server id='%s' ssh Exception: %s", server_id, text)
1588 if "SSH session not active" in text:
1589 self.ssh_connect()
1590 except host_thread.lvirt_module.libvirtError as e:
1591 text = e.get_error_message()
1592 self.logger.error("launch_server id='%s' libvirt Exception: %s", server_id, text)
1593 except Exception as e:
1594 text = str(e)
1595 self.logger.error("launch_server id='%s' Exception: %s", server_id, text)
1596 return -1, text
1597
1598 def update_servers_status(self):
1599 # # virDomainState
1600 # VIR_DOMAIN_NOSTATE = 0
1601 # VIR_DOMAIN_RUNNING = 1
1602 # VIR_DOMAIN_BLOCKED = 2
1603 # VIR_DOMAIN_PAUSED = 3
1604 # VIR_DOMAIN_SHUTDOWN = 4
1605 # VIR_DOMAIN_SHUTOFF = 5
1606 # VIR_DOMAIN_CRASHED = 6
1607 # VIR_DOMAIN_PMSUSPENDED = 7 #TODO suspended
1608
1609 if self.test or len(self.server_status)==0:
1610 return
1611
1612 try:
1613 conn = host_thread.lvirt_module.open("qemu+ssh://"+self.user+"@"+self.host+"/system")
1614 domains= conn.listAllDomains()
1615 domain_dict={}
1616 for domain in domains:
1617 uuid = domain.UUIDString() ;
1618 libvirt_status = domain.state()
1619 #print libvirt_status
1620 if libvirt_status[0] == host_thread.lvirt_module.VIR_DOMAIN_RUNNING or libvirt_status[0] == host_thread.lvirt_module.VIR_DOMAIN_SHUTDOWN:
1621 new_status = "ACTIVE"
1622 elif libvirt_status[0] == host_thread.lvirt_module.VIR_DOMAIN_PAUSED:
1623 new_status = "PAUSED"
1624 elif libvirt_status[0] == host_thread.lvirt_module.VIR_DOMAIN_SHUTOFF:
1625 new_status = "INACTIVE"
1626 elif libvirt_status[0] == host_thread.lvirt_module.VIR_DOMAIN_CRASHED:
1627 new_status = "ERROR"
1628 else:
1629 new_status = None
1630 domain_dict[uuid] = new_status
1631 conn.close()
1632 except host_thread.lvirt_module.libvirtError as e:
1633 self.logger.error("get_state() Exception " + e.get_error_message())
1634 return
1635
1636 for server_id, current_status in self.server_status.iteritems():
1637 new_status = None
1638 if server_id in domain_dict:
1639 new_status = domain_dict[server_id]
1640 else:
1641 new_status = "INACTIVE"
1642
1643 if new_status == None or new_status == current_status:
1644 continue
1645 if new_status == 'INACTIVE' and current_status == 'ERROR':
1646 continue #keep ERROR status, because obviously this machine is not running
1647 #change status
1648 self.logger.debug("server id='%s' status change from '%s' to '%s'", server_id, current_status, new_status)
1649 STATUS={'progress':100, 'status':new_status}
1650 if new_status == 'ERROR':
1651 STATUS['last_error'] = 'machine has crashed'
1652 self.db_lock.acquire()
1653 r,_ = self.db.update_rows('instances', STATUS, {'uuid':server_id}, log=False)
1654 self.db_lock.release()
1655 if r>=0:
1656 self.server_status[server_id] = new_status
1657
1658 def action_on_server(self, req, last_retry=True):
1659 '''Perform an action on a req
1660 Attributes:
1661 req: dictionary that contain:
1662 server properties: 'uuid','name','tenant_id','status'
1663 action: 'action'
1664 host properties: 'user', 'ip_name'
1665 return (error, text)
1666 0: No error. VM is updated to new state,
1667 -1: Invalid action, as trying to pause a PAUSED VM
1668 -2: Error accessing host
1669 -3: VM nor present
1670 -4: Error at DB access
1671 -5: Error while trying to perform action. VM is updated to ERROR
1672 '''
1673 server_id = req['uuid']
1674 conn = None
1675 new_status = None
1676 old_status = req['status']
1677 last_error = None
1678
1679 if self.test:
1680 if 'terminate' in req['action']:
1681 new_status = 'deleted'
1682 elif 'shutoff' in req['action'] or 'shutdown' in req['action'] or 'forceOff' in req['action']:
1683 if req['status']!='ERROR':
1684 time.sleep(5)
1685 new_status = 'INACTIVE'
1686 elif 'start' in req['action'] and req['status']!='ERROR':
1687 new_status = 'ACTIVE'
1688 elif 'resume' in req['action'] and req['status']!='ERROR' and req['status']!='INACTIVE':
1689 new_status = 'ACTIVE'
1690 elif 'pause' in req['action'] and req['status']!='ERROR':
1691 new_status = 'PAUSED'
1692 elif 'reboot' in req['action'] and req['status']!='ERROR':
1693 new_status = 'ACTIVE'
1694 elif 'rebuild' in req['action']:
1695 time.sleep(random.randint(20,150))
1696 new_status = 'ACTIVE'
1697 elif 'createImage' in req['action']:
1698 time.sleep(5)
1699 self.create_image(None, req)
1700 else:
1701 try:
1702 conn = host_thread.lvirt_module.open("qemu+ssh://"+self.user+"@"+self.host+"/system")
1703 try:
1704 dom = conn.lookupByUUIDString(server_id)
1705 except host_thread.lvirt_module.libvirtError as e:
1706 text = e.get_error_message()
1707 if 'LookupByUUIDString' in text or 'Domain not found' in text or 'No existe un dominio coincidente' in text:
1708 dom = None
1709 else:
1710 self.logger.error("action_on_server id='%s' libvirt exception: %s", server_id, text)
1711 raise e
1712
1713 if 'forceOff' in req['action']:
1714 if dom == None:
1715 self.logger.debug("action_on_server id='%s' domain not running", server_id)
1716 else:
1717 try:
1718 self.logger.debug("sending DESTROY to server id='%s'", server_id)
1719 dom.destroy()
1720 except Exception as e:
1721 if "domain is not running" not in e.get_error_message():
1722 self.logger.error("action_on_server id='%s' Exception while sending force off: %s",
1723 server_id, e.get_error_message())
1724 last_error = 'action_on_server Exception while destroy: ' + e.get_error_message()
1725 new_status = 'ERROR'
1726
1727 elif 'terminate' in req['action']:
1728 if dom == None:
1729 self.logger.debug("action_on_server id='%s' domain not running", server_id)
1730 new_status = 'deleted'
1731 else:
1732 try:
1733 if req['action']['terminate'] == 'force':
1734 self.logger.debug("sending DESTROY to server id='%s'", server_id)
1735 dom.destroy()
1736 new_status = 'deleted'
1737 else:
1738 self.logger.debug("sending SHUTDOWN to server id='%s'", server_id)
1739 dom.shutdown()
1740 self.pending_terminate_server.append( (time.time()+10,server_id) )
1741 except Exception as e:
1742 self.logger.error("action_on_server id='%s' Exception while destroy: %s",
1743 server_id, e.get_error_message())
1744 last_error = 'action_on_server Exception while destroy: ' + e.get_error_message()
1745 new_status = 'ERROR'
1746 if "domain is not running" in e.get_error_message():
1747 try:
1748 dom.undefine()
1749 new_status = 'deleted'
1750 except Exception:
1751 self.logger.error("action_on_server id='%s' Exception while undefine: %s",
1752 server_id, e.get_error_message())
1753 last_error = 'action_on_server Exception2 while undefine:', e.get_error_message()
1754 #Exception: 'virDomainDetachDevice() failed'
1755 if new_status=='deleted':
1756 if server_id in self.server_status:
1757 del self.server_status[server_id]
1758 if req['uuid'] in self.localinfo['server_files']:
1759 for file_ in self.localinfo['server_files'][ req['uuid'] ].values():
1760 try:
1761 self.delete_file(file_['source file'])
1762 except Exception:
1763 pass
1764 del self.localinfo['server_files'][ req['uuid'] ]
1765 self.localinfo_dirty = True
1766
1767 elif 'shutoff' in req['action'] or 'shutdown' in req['action']:
1768 try:
1769 if dom == None:
1770 self.logger.debug("action_on_server id='%s' domain not running", server_id)
1771 else:
1772 dom.shutdown()
1773 # new_status = 'INACTIVE'
1774 #TODO: check status for changing at database
1775 except Exception as e:
1776 new_status = 'ERROR'
1777 self.logger.error("action_on_server id='%s' Exception while shutdown: %s",
1778 server_id, e.get_error_message())
1779 last_error = 'action_on_server Exception while shutdown: ' + e.get_error_message()
1780
1781 elif 'rebuild' in req['action']:
1782 if dom != None:
1783 dom.destroy()
1784 r = self.launch_server(conn, req, True, None)
1785 if r[0] <0:
1786 new_status = 'ERROR'
1787 last_error = r[1]
1788 else:
1789 new_status = 'ACTIVE'
1790 elif 'start' in req['action']:
1791 # The instance is only create in DB but not yet at libvirt domain, needs to be create
1792 rebuild = True if req['action']['start'] == 'rebuild' else False
1793 r = self.launch_server(conn, req, rebuild, dom)
1794 if r[0] <0:
1795 new_status = 'ERROR'
1796 last_error = r[1]
1797 else:
1798 new_status = 'ACTIVE'
1799
1800 elif 'resume' in req['action']:
1801 try:
1802 if dom == None:
1803 pass
1804 else:
1805 dom.resume()
1806 # new_status = 'ACTIVE'
1807 except Exception as e:
1808 self.logger.error("action_on_server id='%s' Exception while resume: %s",
1809 server_id, e.get_error_message())
1810
1811 elif 'pause' in req['action']:
1812 try:
1813 if dom == None:
1814 pass
1815 else:
1816 dom.suspend()
1817 # new_status = 'PAUSED'
1818 except Exception as e:
1819 self.logger.error("action_on_server id='%s' Exception while pause: %s",
1820 server_id, e.get_error_message())
1821
1822 elif 'reboot' in req['action']:
1823 try:
1824 if dom == None:
1825 pass
1826 else:
1827 dom.reboot()
1828 self.logger.debug("action_on_server id='%s' reboot:", server_id)
1829 #new_status = 'ACTIVE'
1830 except Exception as e:
1831 self.logger.error("action_on_server id='%s' Exception while reboot: %s",
1832 server_id, e.get_error_message())
1833 elif 'createImage' in req['action']:
1834 self.create_image(dom, req)
1835
1836
1837 conn.close()
1838 except host_thread.lvirt_module.libvirtError as e:
1839 if conn is not None: conn.close()
1840 text = e.get_error_message()
1841 new_status = "ERROR"
1842 last_error = text
1843 if 'LookupByUUIDString' in text or 'Domain not found' in text or 'No existe un dominio coincidente' in text:
1844 self.logger.debug("action_on_server id='%s' Exception removed from host", server_id)
1845 else:
1846 self.logger.error("action_on_server id='%s' Exception %s", server_id, text)
1847 #end of if self.test
1848 if new_status == None:
1849 return 1
1850
1851 self.logger.debug("action_on_server id='%s' new status=%s %s",server_id, new_status, last_error)
1852 UPDATE = {'progress':100, 'status':new_status}
1853
1854 if new_status=='ERROR':
1855 if not last_retry: #if there will be another retry do not update database
1856 return -1
1857 elif 'terminate' in req['action']:
1858 #PUT a log in the database
1859 self.logger.error("PANIC deleting server id='%s' %s", server_id, last_error)
1860 self.db_lock.acquire()
1861 self.db.new_row('logs',
1862 {'uuid':server_id, 'tenant_id':req['tenant_id'], 'related':'instances','level':'panic',
1863 'description':'PANIC deleting server from host '+self.name+': '+last_error}
1864 )
1865 self.db_lock.release()
1866 if server_id in self.server_status:
1867 del self.server_status[server_id]
1868 return -1
1869 else:
1870 UPDATE['last_error'] = last_error
1871 if new_status != 'deleted' and (new_status != old_status or new_status == 'ERROR') :
1872 self.db_lock.acquire()
1873 self.db.update_rows('instances', UPDATE, {'uuid':server_id}, log=True)
1874 self.server_status[server_id] = new_status
1875 self.db_lock.release()
1876 if new_status == 'ERROR':
1877 return -1
1878 return 1
1879
1880
1881 def restore_iface(self, name, mac, lib_conn=None):
1882 ''' make an ifdown, ifup to restore default parameter of na interface
1883 Params:
1884 mac: mac address of the interface
1885 lib_conn: connection to the libvirt, if None a new connection is created
1886 Return 0,None if ok, -1,text if fails
1887 '''
1888 conn=None
1889 ret = 0
1890 error_text=None
1891 if self.test:
1892 self.logger.debug("restore_iface '%s' %s", name, mac)
1893 return 0, None
1894 try:
1895 if not lib_conn:
1896 conn = host_thread.lvirt_module.open("qemu+ssh://"+self.user+"@"+self.host+"/system")
1897 else:
1898 conn = lib_conn
1899
1900 #wait to the pending VM deletion
1901 #TODO.Revise self.server_forceoff(True)
1902
1903 iface = conn.interfaceLookupByMACString(mac)
1904 iface.destroy()
1905 iface.create()
1906 self.logger.debug("restore_iface '%s' %s", name, mac)
1907 except host_thread.lvirt_module.libvirtError as e:
1908 error_text = e.get_error_message()
1909 self.logger.error("restore_iface '%s' '%s' libvirt exception: %s", name, mac, error_text)
1910 ret=-1
1911 finally:
1912 if lib_conn is None and conn is not None:
1913 conn.close()
1914 return ret, error_text
1915
1916
1917 def create_image(self,dom, req):
1918 if self.test:
1919 if 'path' in req['action']['createImage']:
1920 file_dst = req['action']['createImage']['path']
1921 else:
1922 createImage=req['action']['createImage']
1923 img_name= createImage['source']['path']
1924 index=img_name.rfind('/')
1925 file_dst = self.get_notused_path(img_name[:index+1] + createImage['name'] + '.qcow2')
1926 image_status='ACTIVE'
1927 else:
1928 for retry in (0,1):
1929 try:
1930 server_id = req['uuid']
1931 createImage=req['action']['createImage']
1932 file_orig = self.localinfo['server_files'][server_id] [ createImage['source']['image_id'] ] ['source file']
1933 if 'path' in req['action']['createImage']:
1934 file_dst = req['action']['createImage']['path']
1935 else:
1936 img_name= createImage['source']['path']
1937 index=img_name.rfind('/')
1938 file_dst = self.get_notused_filename(img_name[:index+1] + createImage['name'] + '.qcow2')
1939
1940 self.copy_file(file_orig, file_dst)
1941 qemu_info = self.qemu_get_info(file_orig)
1942 if 'backing file' in qemu_info:
1943 for k,v in self.localinfo['files'].items():
1944 if v==qemu_info['backing file']:
1945 self.qemu_change_backing(file_dst, k)
1946 break
1947 image_status='ACTIVE'
1948 break
1949 except paramiko.ssh_exception.SSHException as e:
1950 image_status='ERROR'
1951 error_text = e.args[0]
1952 self.logger.error("create_image id='%s' ssh Exception: %s", server_id, error_text)
1953 if "SSH session not active" in error_text and retry==0:
1954 self.ssh_connect()
1955 except Exception as e:
1956 image_status='ERROR'
1957 error_text = str(e)
1958 self.logger.error("create_image id='%s' Exception: %s", server_id, error_text)
1959
1960 #TODO insert a last_error at database
1961 self.db_lock.acquire()
1962 self.db.update_rows('images', {'status':image_status, 'progress': 100, 'path':file_dst},
1963 {'uuid':req['new_image']['uuid']}, log=True)
1964 self.db_lock.release()
1965
1966 def edit_iface(self, port_id, old_net, new_net):
1967 #This action imply remove and insert interface to put proper parameters
1968 if self.test:
1969 time.sleep(1)
1970 else:
1971 #get iface details
1972 self.db_lock.acquire()
1973 r,c = self.db.get_table(FROM='ports as p join resources_port as rp on p.uuid=rp.port_id',
1974 WHERE={'port_id': port_id})
1975 self.db_lock.release()
1976 if r<0:
1977 self.logger.error("edit_iface %s DDBB error: %s", port_id, c)
1978 return
1979 elif r==0:
1980 self.logger.error("edit_iface %s port not found", port_id)
1981 return
1982 port=c[0]
1983 if port["model"]!="VF":
1984 self.logger.error("edit_iface %s ERROR model must be VF", port_id)
1985 return
1986 #create xml detach file
1987 xml=[]
1988 self.xml_level = 2
1989 xml.append("<interface type='hostdev' managed='yes'>")
1990 xml.append(" <mac address='" +port['mac']+ "'/>")
1991 xml.append(" <source>"+ self.pci2xml(port['pci'])+"\n </source>")
1992 xml.append('</interface>')
1993
1994
1995 try:
1996 conn=None
1997 conn = host_thread.lvirt_module.open("qemu+ssh://"+self.user+"@"+self.host+"/system")
1998 dom = conn.lookupByUUIDString(port["instance_id"])
1999 if old_net:
2000 text="\n".join(xml)
2001 self.logger.debug("edit_iface detaching SRIOV interface " + text)
2002 dom.detachDeviceFlags(text, flags=host_thread.lvirt_module.VIR_DOMAIN_AFFECT_LIVE)
2003 if new_net:
2004 xml[-1] =" <vlan> <tag id='" + str(port['vlan']) + "'/> </vlan>"
2005 self.xml_level = 1
2006 xml.append(self.pci2xml(port.get('vpci',None)) )
2007 xml.append('</interface>')
2008 text="\n".join(xml)
2009 self.logger.debug("edit_iface attaching SRIOV interface " + text)
2010 dom.attachDeviceFlags(text, flags=host_thread.lvirt_module.VIR_DOMAIN_AFFECT_LIVE)
2011
2012 except host_thread.lvirt_module.libvirtError as e:
2013 text = e.get_error_message()
2014 self.logger.error("edit_iface %s libvirt exception: %s", port["instance_id"], text)
2015
2016 finally:
2017 if conn is not None: conn.close()
2018
2019
2020 def create_server(server, db, db_lock, only_of_ports):
2021 extended = server.get('extended', None)
2022 requirements={}
2023 requirements['numa']={'memory':0, 'proc_req_type': 'threads', 'proc_req_nb':0, 'port_list':[], 'sriov_list':[]}
2024 requirements['ram'] = server['flavor'].get('ram', 0)
2025 if requirements['ram']== None:
2026 requirements['ram'] = 0
2027 requirements['vcpus'] = server['flavor'].get('vcpus', 0)
2028 if requirements['vcpus']== None:
2029 requirements['vcpus'] = 0
2030 #If extended is not defined get requirements from flavor
2031 if extended is None:
2032 #If extended is defined in flavor convert to dictionary and use it
2033 if 'extended' in server['flavor'] and server['flavor']['extended'] != None:
2034 json_acceptable_string = server['flavor']['extended'].replace("'", "\"")
2035 extended = json.loads(json_acceptable_string)
2036 else:
2037 extended = None
2038 #print json.dumps(extended, indent=4)
2039
2040 #For simplicity only one numa VM are supported in the initial implementation
2041 if extended != None:
2042 numas = extended.get('numas', [])
2043 if len(numas)>1:
2044 return (-2, "Multi-NUMA VMs are not supported yet")
2045 #elif len(numas)<1:
2046 # return (-1, "At least one numa must be specified")
2047
2048 #a for loop is used in order to be ready to multi-NUMA VMs
2049 request = []
2050 for numa in numas:
2051 numa_req = {}
2052 numa_req['memory'] = numa.get('memory', 0)
2053 if 'cores' in numa:
2054 numa_req['proc_req_nb'] = numa['cores'] #number of cores or threads to be reserved
2055 numa_req['proc_req_type'] = 'cores' #indicates whether cores or threads must be reserved
2056 numa_req['proc_req_list'] = numa.get('cores-id', None) #list of ids to be assigned to the cores or threads
2057 elif 'paired-threads' in numa:
2058 numa_req['proc_req_nb'] = numa['paired-threads']
2059 numa_req['proc_req_type'] = 'paired-threads'
2060 numa_req['proc_req_list'] = numa.get('paired-threads-id', None)
2061 elif 'threads' in numa:
2062 numa_req['proc_req_nb'] = numa['threads']
2063 numa_req['proc_req_type'] = 'threads'
2064 numa_req['proc_req_list'] = numa.get('threads-id', None)
2065 else:
2066 numa_req['proc_req_nb'] = 0 # by default
2067 numa_req['proc_req_type'] = 'threads'
2068
2069
2070
2071 #Generate a list of sriov and another for physical interfaces
2072 interfaces = numa.get('interfaces', [])
2073 sriov_list = []
2074 port_list = []
2075 for iface in interfaces:
2076 iface['bandwidth'] = int(iface['bandwidth'])
2077 if iface['dedicated'][:3]=='yes':
2078 port_list.append(iface)
2079 else:
2080 sriov_list.append(iface)
2081
2082 #Save lists ordered from more restrictive to less bw requirements
2083 numa_req['sriov_list'] = sorted(sriov_list, key=lambda k: k['bandwidth'], reverse=True)
2084 numa_req['port_list'] = sorted(port_list, key=lambda k: k['bandwidth'], reverse=True)
2085
2086
2087 request.append(numa_req)
2088
2089 # print "----------\n"+json.dumps(request[0], indent=4)
2090 # print '----------\n\n'
2091
2092 #Search in db for an appropriate numa for each requested numa
2093 #at the moment multi-NUMA VMs are not supported
2094 if len(request)>0:
2095 requirements['numa'].update(request[0])
2096 if requirements['numa']['memory']>0:
2097 requirements['ram']=0 #By the moment I make incompatible ask for both Huge and non huge pages memory
2098 elif requirements['ram']==0:
2099 return (-1, "Memory information not set neither at extended field not at ram")
2100 if requirements['numa']['proc_req_nb']>0:
2101 requirements['vcpus']=0 #By the moment I make incompatible ask for both Isolated and non isolated cpus
2102 elif requirements['vcpus']==0:
2103 return (-1, "Processor information not set neither at extended field not at vcpus")
2104
2105
2106 db_lock.acquire()
2107 result, content = db.get_numas(requirements, server.get('host_id', None), only_of_ports)
2108 db_lock.release()
2109
2110 if result == -1:
2111 return (-1, content)
2112
2113 numa_id = content['numa_id']
2114 host_id = content['host_id']
2115
2116 #obtain threads_id and calculate pinning
2117 cpu_pinning = []
2118 reserved_threads=[]
2119 if requirements['numa']['proc_req_nb']>0:
2120 db_lock.acquire()
2121 result, content = db.get_table(FROM='resources_core',
2122 SELECT=('id','core_id','thread_id'),
2123 WHERE={'numa_id':numa_id,'instance_id': None, 'status':'ok'} )
2124 db_lock.release()
2125 if result <= 0:
2126 #print content
2127 return -1, content
2128
2129 #convert rows to a dictionary indexed by core_id
2130 cores_dict = {}
2131 for row in content:
2132 if not row['core_id'] in cores_dict:
2133 cores_dict[row['core_id']] = []
2134 cores_dict[row['core_id']].append([row['thread_id'],row['id']])
2135
2136 #In case full cores are requested
2137 paired = 'N'
2138 if requirements['numa']['proc_req_type'] == 'cores':
2139 #Get/create the list of the vcpu_ids
2140 vcpu_id_list = requirements['numa']['proc_req_list']
2141 if vcpu_id_list == None:
2142 vcpu_id_list = range(0,int(requirements['numa']['proc_req_nb']))
2143
2144 for threads in cores_dict.itervalues():
2145 #we need full cores
2146 if len(threads) != 2:
2147 continue
2148
2149 #set pinning for the first thread
2150 cpu_pinning.append( [ vcpu_id_list.pop(0), threads[0][0], threads[0][1] ] )
2151
2152 #reserve so it is not used the second thread
2153 reserved_threads.append(threads[1][1])
2154
2155 if len(vcpu_id_list) == 0:
2156 break
2157
2158 #In case paired threads are requested
2159 elif requirements['numa']['proc_req_type'] == 'paired-threads':
2160 paired = 'Y'
2161 #Get/create the list of the vcpu_ids
2162 if requirements['numa']['proc_req_list'] != None:
2163 vcpu_id_list = []
2164 for pair in requirements['numa']['proc_req_list']:
2165 if len(pair)!=2:
2166 return -1, "Field paired-threads-id not properly specified"
2167 return
2168 vcpu_id_list.append(pair[0])
2169 vcpu_id_list.append(pair[1])
2170 else:
2171 vcpu_id_list = range(0,2*int(requirements['numa']['proc_req_nb']))
2172
2173 for threads in cores_dict.itervalues():
2174 #we need full cores
2175 if len(threads) != 2:
2176 continue
2177 #set pinning for the first thread
2178 cpu_pinning.append([vcpu_id_list.pop(0), threads[0][0], threads[0][1]])
2179
2180 #set pinning for the second thread
2181 cpu_pinning.append([vcpu_id_list.pop(0), threads[1][0], threads[1][1]])
2182
2183 if len(vcpu_id_list) == 0:
2184 break
2185
2186 #In case normal threads are requested
2187 elif requirements['numa']['proc_req_type'] == 'threads':
2188 #Get/create the list of the vcpu_ids
2189 vcpu_id_list = requirements['numa']['proc_req_list']
2190 if vcpu_id_list == None:
2191 vcpu_id_list = range(0,int(requirements['numa']['proc_req_nb']))
2192
2193 for threads_index in sorted(cores_dict, key=lambda k: len(cores_dict[k])):
2194 threads = cores_dict[threads_index]
2195 #set pinning for the first thread
2196 cpu_pinning.append([vcpu_id_list.pop(0), threads[0][0], threads[0][1]])
2197
2198 #if exists, set pinning for the second thread
2199 if len(threads) == 2 and len(vcpu_id_list) != 0:
2200 cpu_pinning.append([vcpu_id_list.pop(0), threads[1][0], threads[1][1]])
2201
2202 if len(vcpu_id_list) == 0:
2203 break
2204
2205 #Get the source pci addresses for the selected numa
2206 used_sriov_ports = []
2207 for port in requirements['numa']['sriov_list']:
2208 db_lock.acquire()
2209 result, content = db.get_table(FROM='resources_port', SELECT=('id', 'pci', 'mac'),WHERE={'numa_id':numa_id,'root_id': port['port_id'], 'port_id': None, 'Mbps_used': 0} )
2210 db_lock.release()
2211 if result <= 0:
2212 #print content
2213 return -1, content
2214 for row in content:
2215 if row['id'] in used_sriov_ports or row['id']==port['port_id']:
2216 continue
2217 port['pci'] = row['pci']
2218 if 'mac_address' not in port:
2219 port['mac_address'] = row['mac']
2220 del port['mac']
2221 port['port_id']=row['id']
2222 port['Mbps_used'] = port['bandwidth']
2223 used_sriov_ports.append(row['id'])
2224 break
2225
2226 for port in requirements['numa']['port_list']:
2227 port['Mbps_used'] = None
2228 if port['dedicated'] != "yes:sriov":
2229 port['mac_address'] = port['mac']
2230 del port['mac']
2231 continue
2232 db_lock.acquire()
2233 result, content = db.get_table(FROM='resources_port', SELECT=('id', 'pci', 'mac', 'Mbps'),WHERE={'numa_id':numa_id,'root_id': port['port_id'], 'port_id': None, 'Mbps_used': 0} )
2234 db_lock.release()
2235 if result <= 0:
2236 #print content
2237 return -1, content
2238 port['Mbps_used'] = content[0]['Mbps']
2239 for row in content:
2240 if row['id'] in used_sriov_ports or row['id']==port['port_id']:
2241 continue
2242 port['pci'] = row['pci']
2243 if 'mac_address' not in port:
2244 port['mac_address'] = row['mac'] # mac cannot be set to passthrough ports
2245 del port['mac']
2246 port['port_id']=row['id']
2247 used_sriov_ports.append(row['id'])
2248 break
2249
2250 # print '2. Physical ports assignation:'+json.dumps(requirements['port_list'], indent=4)
2251 # print '2. SR-IOV assignation:'+json.dumps(requirements['sriov_list'], indent=4)
2252
2253 server['host_id'] = host_id
2254
2255 #Generate dictionary for saving in db the instance resources
2256 resources = {}
2257 resources['bridged-ifaces'] = []
2258
2259 numa_dict = {}
2260 numa_dict['interfaces'] = []
2261
2262 numa_dict['interfaces'] += requirements['numa']['port_list']
2263 numa_dict['interfaces'] += requirements['numa']['sriov_list']
2264
2265 #Check bridge information
2266 unified_dataplane_iface=[]
2267 unified_dataplane_iface += requirements['numa']['port_list']
2268 unified_dataplane_iface += requirements['numa']['sriov_list']
2269
2270 for control_iface in server.get('networks', []):
2271 control_iface['net_id']=control_iface.pop('uuid')
2272 #Get the brifge name
2273 db_lock.acquire()
2274 result, content = db.get_table(FROM='nets',
2275 SELECT=('name', 'type', 'vlan', 'provider', 'enable_dhcp',
2276 'dhcp_first_ip', 'dhcp_last_ip', 'cidr'),
2277 WHERE={'uuid': control_iface['net_id']})
2278 db_lock.release()
2279 if result < 0:
2280 pass
2281 elif result==0:
2282 return -1, "Error at field netwoks: Not found any network wit uuid %s" % control_iface['net_id']
2283 else:
2284 network=content[0]
2285 if control_iface.get("type", 'virtual') == 'virtual':
2286 if network['type']!='bridge_data' and network['type']!='bridge_man':
2287 return -1, "Error at field netwoks: network uuid %s for control interface is not of type bridge_man or bridge_data" % control_iface['net_id']
2288 resources['bridged-ifaces'].append(control_iface)
2289 if network.get("provider") and network["provider"][0:3] == "OVS":
2290 control_iface["type"] = "instance:ovs"
2291 else:
2292 control_iface["type"] = "instance:bridge"
2293 if network.get("vlan"):
2294 control_iface["vlan"] = network["vlan"]
2295
2296 if network.get("enable_dhcp") == 'true':
2297 control_iface["enable_dhcp"] = network.get("enable_dhcp")
2298 control_iface["dhcp_first_ip"] = network["dhcp_first_ip"]
2299 control_iface["dhcp_last_ip"] = network["dhcp_last_ip"]
2300 control_iface["cidr"] = network["cidr"]
2301 else:
2302 if network['type']!='data' and network['type']!='ptp':
2303 return -1, "Error at field netwoks: network uuid %s for dataplane interface is not of type data or ptp" % control_iface['net_id']
2304 #dataplane interface, look for it in the numa tree and asign this network
2305 iface_found=False
2306 for dataplane_iface in numa_dict['interfaces']:
2307 if dataplane_iface['name'] == control_iface.get("name"):
2308 if (dataplane_iface['dedicated'] == "yes" and control_iface["type"] != "PF") or \
2309 (dataplane_iface['dedicated'] == "no" and control_iface["type"] != "VF") or \
2310 (dataplane_iface['dedicated'] == "yes:sriov" and control_iface["type"] != "VFnotShared") :
2311 return -1, "Error at field netwoks: mismatch at interface '%s' from flavor 'dedicated=%s' and networks 'type=%s'" % \
2312 (control_iface.get("name"), dataplane_iface['dedicated'], control_iface["type"])
2313 dataplane_iface['uuid'] = control_iface['net_id']
2314 if dataplane_iface['dedicated'] == "no":
2315 dataplane_iface['vlan'] = network['vlan']
2316 if dataplane_iface['dedicated'] != "yes" and control_iface.get("mac_address"):
2317 dataplane_iface['mac_address'] = control_iface.get("mac_address")
2318 if control_iface.get("vpci"):
2319 dataplane_iface['vpci'] = control_iface.get("vpci")
2320 iface_found=True
2321 break
2322 if not iface_found:
2323 return -1, "Error at field netwoks: interface name %s from network not found at flavor" % control_iface.get("name")
2324
2325 resources['host_id'] = host_id
2326 resources['image_id'] = server['image_id']
2327 resources['flavor_id'] = server['flavor_id']
2328 resources['tenant_id'] = server['tenant_id']
2329 resources['ram'] = requirements['ram']
2330 resources['vcpus'] = requirements['vcpus']
2331 resources['status'] = 'CREATING'
2332
2333 if 'description' in server: resources['description'] = server['description']
2334 if 'name' in server: resources['name'] = server['name']
2335
2336 resources['extended'] = {} #optional
2337 resources['extended']['numas'] = []
2338 numa_dict['numa_id'] = numa_id
2339 numa_dict['memory'] = requirements['numa']['memory']
2340 numa_dict['cores'] = []
2341
2342 for core in cpu_pinning:
2343 numa_dict['cores'].append({'id': core[2], 'vthread': core[0], 'paired': paired})
2344 for core in reserved_threads:
2345 numa_dict['cores'].append({'id': core})
2346 resources['extended']['numas'].append(numa_dict)
2347 if extended!=None and 'devices' in extended: #TODO allow extra devices without numa
2348 resources['extended']['devices'] = extended['devices']
2349
2350
2351 # '===================================={'
2352 #print json.dumps(resources, indent=4)
2353 #print '====================================}'
2354
2355 return 0, resources
2356