Allow specifying ssh key file for compute nodes and network controller
[osm/openvim.git] / osm_openvim / host_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
5 # This file is part of openvim
6 # All Rights Reserved.
7 #
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
11 #
12 # http://www.apache.org/licenses/LICENSE-2.0
13 #
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
18 # under the License.
19 #
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact with: nfvlabs@tid.es
22 ##
23
24 '''
25 This is thread that interact with the host and the libvirt to manage VM
26 One thread will be launched per host
27 '''
28 __author__ = "Pablo Montes, Alfonso Tierno, Leonardo Mirabal"
29 __date__ = "$10-jul-2014 12:07:15$"
30
31 import json
32 import yaml
33 import threading
34 import time
35 import Queue
36 import paramiko
37 # import subprocess
38 # import libvirt
39 import imp
40 import random
41 import os
42 import logging
43 from jsonschema import validate as js_v, exceptions as js_e
44 from vim_schema import localinfo_schema, hostinfo_schema
45
46
47 class host_thread(threading.Thread):
48 lvirt_module = None
49
50 def __init__(self, name, host, user, db, db_lock, test, image_path, host_id, version, develop_mode,
51 develop_bridge_iface, password=None, keyfile = None, logger_name=None, debug=None):
52 '''Init a thread.
53 Arguments:
54 'id' number of thead
55 'name' name of thread
56 'host','user': host ip or name to manage and user
57 'db', 'db_lock': database class and lock to use it in exclusion
58 '''
59 threading.Thread.__init__(self)
60 self.name = name
61 self.host = host
62 self.user = user
63 self.db = db
64 self.db_lock = db_lock
65 self.test = test
66 self.password = password
67 self.keyfile = keyfile
68 self.localinfo_dirty = False
69
70 if not test and not host_thread.lvirt_module:
71 try:
72 module_info = imp.find_module("libvirt")
73 host_thread.lvirt_module = imp.load_module("libvirt", *module_info)
74 except (IOError, ImportError) as e:
75 raise ImportError("Cannot import python-libvirt. Openvim not properly installed" +str(e))
76 if logger_name:
77 self.logger_name = logger_name
78 else:
79 self.logger_name = "openvim.host."+name
80 self.logger = logging.getLogger(self.logger_name)
81 if debug:
82 self.logger.setLevel(getattr(logging, debug))
83
84
85 self.develop_mode = develop_mode
86 self.develop_bridge_iface = develop_bridge_iface
87 self.image_path = image_path
88 self.host_id = host_id
89 self.version = version
90
91 self.xml_level = 0
92 #self.pending ={}
93
94 self.server_status = {} #dictionary with pairs server_uuid:server_status
95 self.pending_terminate_server =[] #list with pairs (time,server_uuid) time to send a terminate for a server being destroyed
96 self.next_update_server_status = 0 #time when must be check servers status
97
98 self.hostinfo = None
99
100 self.queueLock = threading.Lock()
101 self.taskQueue = Queue.Queue(2000)
102 self.ssh_conn = None
103 self.lvirt_conn_uri = "qemu+ssh://{user}@{host}/system?no_tty=1&no_verify=1".format(
104 user=self.user, host=self.host)
105 if keyfile:
106 self.lvirt_conn_uri += "&keyfile=" + keyfile
107
108 def ssh_connect(self):
109 try:
110 # Connect SSH
111 self.ssh_conn = paramiko.SSHClient()
112 self.ssh_conn.set_missing_host_key_policy(paramiko.AutoAddPolicy())
113 self.ssh_conn.load_system_host_keys()
114 self.ssh_conn.connect(self.host, username=self.user, password=self.password, key_filename=self.keyfile,
115 timeout=10) #, None)
116 except paramiko.ssh_exception.SSHException as e:
117 text = e.args[0]
118 self.logger.error("ssh_connect ssh Exception: " + text)
119
120 def load_localinfo(self):
121 if not self.test:
122 try:
123 # Connect SSH
124 self.ssh_connect()
125
126 command = 'mkdir -p ' + self.image_path
127 # print self.name, ': command:', command
128 (_, stdout, stderr) = self.ssh_conn.exec_command(command)
129 content = stderr.read()
130 if len(content) > 0:
131 self.logger.error("command: '%s' stderr: '%s'", command, content)
132
133 command = 'cat ' + self.image_path + '/.openvim.yaml'
134 # print self.name, ': command:', command
135 (_, stdout, stderr) = self.ssh_conn.exec_command(command)
136 content = stdout.read()
137 if len(content) == 0:
138 self.logger.error("command: '%s' stderr='%s'", command, stderr.read())
139 raise paramiko.ssh_exception.SSHException("Error empty file, command: '{}'".format(command))
140 self.localinfo = yaml.load(content)
141 js_v(self.localinfo, localinfo_schema)
142 self.localinfo_dirty = False
143 if 'server_files' not in self.localinfo:
144 self.localinfo['server_files'] = {}
145 self.logger.debug("localinfo load from host")
146 return
147
148 except paramiko.ssh_exception.SSHException as e:
149 text = e.args[0]
150 self.logger.error("load_localinfo ssh Exception: " + text)
151 except host_thread.lvirt_module.libvirtError as e:
152 text = e.get_error_message()
153 self.logger.error("load_localinfo libvirt Exception: " + text)
154 except yaml.YAMLError as exc:
155 text = ""
156 if hasattr(exc, 'problem_mark'):
157 mark = exc.problem_mark
158 text = " at position: (%s:%s)" % (mark.line+1, mark.column+1)
159 self.logger.error("load_localinfo yaml format Exception " + text)
160 except js_e.ValidationError as e:
161 text = ""
162 if len(e.path)>0: text=" at '" + ":".join(map(str, e.path))+"'"
163 self.logger.error("load_localinfo format Exception: %s %s", text, str(e))
164 except Exception as e:
165 text = str(e)
166 self.logger.error("load_localinfo Exception: " + text)
167
168 #not loaded, insert a default data and force saving by activating dirty flag
169 self.localinfo = {'files':{}, 'server_files':{} }
170 #self.localinfo_dirty=True
171 self.localinfo_dirty=False
172
173 def load_hostinfo(self):
174 if self.test:
175 return;
176 try:
177 #Connect SSH
178 self.ssh_connect()
179
180
181 command = 'cat ' + self.image_path + '/hostinfo.yaml'
182 #print self.name, ': command:', command
183 (_, stdout, stderr) = self.ssh_conn.exec_command(command)
184 content = stdout.read()
185 if len(content) == 0:
186 self.logger.error("command: '%s' stderr: '%s'", command, stderr.read())
187 raise paramiko.ssh_exception.SSHException("Error empty file ")
188 self.hostinfo = yaml.load(content)
189 js_v(self.hostinfo, hostinfo_schema)
190 self.logger.debug("hostlinfo load from host " + str(self.hostinfo))
191 return
192
193 except paramiko.ssh_exception.SSHException as e:
194 text = e.args[0]
195 self.logger.error("load_hostinfo ssh Exception: " + text)
196 except host_thread.lvirt_module.libvirtError as e:
197 text = e.get_error_message()
198 self.logger.error("load_hostinfo libvirt Exception: " + text)
199 except yaml.YAMLError as exc:
200 text = ""
201 if hasattr(exc, 'problem_mark'):
202 mark = exc.problem_mark
203 text = " at position: (%s:%s)" % (mark.line+1, mark.column+1)
204 self.logger.error("load_hostinfo yaml format Exception " + text)
205 except js_e.ValidationError as e:
206 text = ""
207 if len(e.path)>0: text=" at '" + ":".join(map(str, e.path))+"'"
208 self.logger.error("load_hostinfo format Exception: %s %s", text, e.message)
209 except Exception as e:
210 text = str(e)
211 self.logger.error("load_hostinfo Exception: " + text)
212
213 #not loaded, insert a default data
214 self.hostinfo = None
215
216 def save_localinfo(self, tries=3):
217 if self.test:
218 self.localinfo_dirty = False
219 return
220
221 while tries>=0:
222 tries-=1
223
224 try:
225 command = 'cat > ' + self.image_path + '/.openvim.yaml'
226 self.logger.debug("command:" + command)
227 (stdin, _, _) = self.ssh_conn.exec_command(command)
228 yaml.safe_dump(self.localinfo, stdin, explicit_start=True, indent=4, default_flow_style=False, tags=False, encoding='utf-8', allow_unicode=True)
229 self.localinfo_dirty = False
230 break #while tries
231
232 except paramiko.ssh_exception.SSHException as e:
233 text = e.args[0]
234 self.logger.error("save_localinfo ssh Exception: " + text)
235 if "SSH session not active" in text:
236 self.ssh_connect()
237 except host_thread.lvirt_module.libvirtError as e:
238 text = e.get_error_message()
239 self.logger.error("save_localinfo libvirt Exception: " + text)
240 except yaml.YAMLError as exc:
241 text = ""
242 if hasattr(exc, 'problem_mark'):
243 mark = exc.problem_mark
244 text = " at position: (%s:%s)" % (mark.line+1, mark.column+1)
245 self.logger.error("save_localinfo yaml format Exception " + text)
246 except Exception as e:
247 text = str(e)
248 self.logger.error("save_localinfo Exception: " + text)
249
250 def load_servers_from_db(self):
251 self.db_lock.acquire()
252 r,c = self.db.get_table(SELECT=('uuid','status', 'image_id'), FROM='instances', WHERE={'host_id': self.host_id})
253 self.db_lock.release()
254
255 self.server_status = {}
256 if r<0:
257 self.logger.error("Error getting data from database: " + c)
258 return
259 for server in c:
260 self.server_status[ server['uuid'] ] = server['status']
261
262 #convert from old version to new one
263 if 'inc_files' in self.localinfo and server['uuid'] in self.localinfo['inc_files']:
264 server_files_dict = {'source file': self.localinfo['inc_files'][ server['uuid'] ] [0], 'file format':'raw' }
265 if server_files_dict['source file'][-5:] == 'qcow2':
266 server_files_dict['file format'] = 'qcow2'
267
268 self.localinfo['server_files'][ server['uuid'] ] = { server['image_id'] : server_files_dict }
269 if 'inc_files' in self.localinfo:
270 del self.localinfo['inc_files']
271 self.localinfo_dirty = True
272
273 def delete_unused_files(self):
274 '''Compares self.localinfo['server_files'] content with real servers running self.server_status obtained from database
275 Deletes unused entries at self.loacalinfo and the corresponding local files.
276 The only reason for this mismatch is the manual deletion of instances (VM) at database
277 '''
278 if self.test:
279 return
280 for uuid,images in self.localinfo['server_files'].items():
281 if uuid not in self.server_status:
282 for localfile in images.values():
283 try:
284 self.logger.debug("deleting file '%s' of unused server '%s'", localfile['source file'], uuid)
285 self.delete_file(localfile['source file'])
286 except paramiko.ssh_exception.SSHException as e:
287 self.logger.error("Exception deleting file '%s': %s", localfile['source file'], str(e))
288 del self.localinfo['server_files'][uuid]
289 self.localinfo_dirty = True
290
291 def insert_task(self, task, *aditional):
292 try:
293 self.queueLock.acquire()
294 task = self.taskQueue.put( (task,) + aditional, timeout=5)
295 self.queueLock.release()
296 return 1, None
297 except Queue.Full:
298 return -1, "timeout inserting a task over host " + self.name
299
300 def run(self):
301 while True:
302 self.load_localinfo()
303 self.load_hostinfo()
304 self.load_servers_from_db()
305 self.delete_unused_files()
306 while True:
307 try:
308 self.queueLock.acquire()
309 if not self.taskQueue.empty():
310 task = self.taskQueue.get()
311 else:
312 task = None
313 self.queueLock.release()
314
315 if task is None:
316 now=time.time()
317 if self.localinfo_dirty:
318 self.save_localinfo()
319 elif self.next_update_server_status < now:
320 self.update_servers_status()
321 self.next_update_server_status = now + 5
322 elif len(self.pending_terminate_server)>0 and self.pending_terminate_server[0][0]<now:
323 self.server_forceoff()
324 else:
325 time.sleep(1)
326 continue
327
328 if task[0] == 'instance':
329 self.logger.debug("processing task instance " + str(task[1]['action']))
330 retry = 0
331 while retry < 2:
332 retry += 1
333 r = self.action_on_server(task[1], retry==2)
334 if r >= 0:
335 break
336 elif task[0] == 'image':
337 pass
338 elif task[0] == 'exit':
339 self.logger.debug("processing task exit")
340 self.terminate()
341 return 0
342 elif task[0] == 'reload':
343 self.logger.debug("processing task reload terminating and relaunching")
344 self.terminate()
345 break
346 elif task[0] == 'edit-iface':
347 self.logger.debug("processing task edit-iface port={}, old_net={}, new_net={}".format(
348 task[1], task[2], task[3]))
349 self.edit_iface(task[1], task[2], task[3])
350 elif task[0] == 'restore-iface':
351 self.logger.debug("processing task restore-iface={} mac={}".format(task[1], task[2]))
352 self.restore_iface(task[1], task[2])
353 elif task[0] == 'new-ovsbridge':
354 self.logger.debug("Creating compute OVS bridge")
355 self.create_ovs_bridge()
356 elif task[0] == 'new-vxlan':
357 self.logger.debug("Creating vxlan tunnel='{}', remote ip='{}'".format(task[1], task[2]))
358 self.create_ovs_vxlan_tunnel(task[1], task[2])
359 elif task[0] == 'del-ovsbridge':
360 self.logger.debug("Deleting OVS bridge")
361 self.delete_ovs_bridge()
362 elif task[0] == 'del-vxlan':
363 self.logger.debug("Deleting vxlan {} tunnel".format(task[1]))
364 self.delete_ovs_vxlan_tunnel(task[1])
365 elif task[0] == 'create-ovs-bridge-port':
366 self.logger.debug("Adding port ovim-{} to OVS bridge".format(task[1]))
367 self.create_ovs_bridge_port(task[1])
368 elif task[0] == 'del-ovs-port':
369 self.logger.debug("Delete bridge attached to ovs port vlan {} net {}".format(task[1], task[2]))
370 self.delete_bridge_port_attached_to_ovs(task[1], task[2])
371 else:
372 self.logger.debug("unknown task " + str(task))
373
374 except Exception as e:
375 self.logger.critical("Unexpected exception at run: " + str(e), exc_info=True)
376
377 def server_forceoff(self, wait_until_finished=False):
378 while len(self.pending_terminate_server)>0:
379 now = time.time()
380 if self.pending_terminate_server[0][0]>now:
381 if wait_until_finished:
382 time.sleep(1)
383 continue
384 else:
385 return
386 req={'uuid':self.pending_terminate_server[0][1],
387 'action':{'terminate':'force'},
388 'status': None
389 }
390 self.action_on_server(req)
391 self.pending_terminate_server.pop(0)
392
393 def terminate(self):
394 try:
395 self.server_forceoff(True)
396 if self.localinfo_dirty:
397 self.save_localinfo()
398 if not self.test:
399 self.ssh_conn.close()
400 except Exception as e:
401 text = str(e)
402 self.logger.error("terminate Exception: " + text)
403 self.logger.debug("exit from host_thread")
404
405 def get_local_iface_name(self, generic_name):
406 if self.hostinfo != None and "iface_names" in self.hostinfo and generic_name in self.hostinfo["iface_names"]:
407 return self.hostinfo["iface_names"][generic_name]
408 return generic_name
409
410 def create_xml_server(self, server, dev_list, server_metadata={}):
411 """Function that implements the generation of the VM XML definition.
412 Additional devices are in dev_list list
413 The main disk is upon dev_list[0]"""
414
415 #get if operating system is Windows
416 windows_os = False
417 os_type = server_metadata.get('os_type', None)
418 if os_type == None and 'metadata' in dev_list[0]:
419 os_type = dev_list[0]['metadata'].get('os_type', None)
420 if os_type != None and os_type.lower() == "windows":
421 windows_os = True
422 #get type of hard disk bus
423 bus_ide = True if windows_os else False
424 bus = server_metadata.get('bus', None)
425 if bus == None and 'metadata' in dev_list[0]:
426 bus = dev_list[0]['metadata'].get('bus', None)
427 if bus != None:
428 bus_ide = True if bus=='ide' else False
429
430 self.xml_level = 0
431
432 text = "<domain type='kvm'>"
433 #get topology
434 topo = server_metadata.get('topology', None)
435 if topo == None and 'metadata' in dev_list[0]:
436 topo = dev_list[0]['metadata'].get('topology', None)
437 #name
438 name = server.get('name','') + "_" + server['uuid']
439 name = name[:58] #qemu impose a length limit of 59 chars or not start. Using 58
440 text += self.inc_tab() + "<name>" + name+ "</name>"
441 #uuid
442 text += self.tab() + "<uuid>" + server['uuid'] + "</uuid>"
443
444 numa={}
445 if 'extended' in server and server['extended']!=None and 'numas' in server['extended']:
446 numa = server['extended']['numas'][0]
447 #memory
448 use_huge = False
449 memory = int(numa.get('memory',0))*1024*1024 #in KiB
450 if memory==0:
451 memory = int(server['ram'])*1024;
452 else:
453 if not self.develop_mode:
454 use_huge = True
455 if memory==0:
456 return -1, 'No memory assigned to instance'
457 memory = str(memory)
458 text += self.tab() + "<memory unit='KiB'>" +memory+"</memory>"
459 text += self.tab() + "<currentMemory unit='KiB'>" +memory+ "</currentMemory>"
460 if use_huge:
461 text += self.tab()+'<memoryBacking>'+ \
462 self.inc_tab() + '<hugepages/>'+ \
463 self.dec_tab()+ '</memoryBacking>'
464
465 #cpu
466 use_cpu_pinning=False
467 vcpus = int(server.get("vcpus",0))
468 cpu_pinning = []
469 if 'cores-source' in numa:
470 use_cpu_pinning=True
471 for index in range(0, len(numa['cores-source'])):
472 cpu_pinning.append( [ numa['cores-id'][index], numa['cores-source'][index] ] )
473 vcpus += 1
474 if 'threads-source' in numa:
475 use_cpu_pinning=True
476 for index in range(0, len(numa['threads-source'])):
477 cpu_pinning.append( [ numa['threads-id'][index], numa['threads-source'][index] ] )
478 vcpus += 1
479 if 'paired-threads-source' in numa:
480 use_cpu_pinning=True
481 for index in range(0, len(numa['paired-threads-source'])):
482 cpu_pinning.append( [numa['paired-threads-id'][index][0], numa['paired-threads-source'][index][0] ] )
483 cpu_pinning.append( [numa['paired-threads-id'][index][1], numa['paired-threads-source'][index][1] ] )
484 vcpus += 2
485
486 if use_cpu_pinning and not self.develop_mode:
487 text += self.tab()+"<vcpu placement='static'>" +str(len(cpu_pinning)) +"</vcpu>" + \
488 self.tab()+'<cputune>'
489 self.xml_level += 1
490 for i in range(0, len(cpu_pinning)):
491 text += self.tab() + "<vcpupin vcpu='" +str(cpu_pinning[i][0])+ "' cpuset='" +str(cpu_pinning[i][1]) +"'/>"
492 text += self.dec_tab()+'</cputune>'+ \
493 self.tab() + '<numatune>' +\
494 self.inc_tab() + "<memory mode='strict' nodeset='" +str(numa['source'])+ "'/>" +\
495 self.dec_tab() + '</numatune>'
496 else:
497 if vcpus==0:
498 return -1, "Instance without number of cpus"
499 text += self.tab()+"<vcpu>" + str(vcpus) + "</vcpu>"
500
501 #boot
502 boot_cdrom = False
503 for dev in dev_list:
504 if dev['type']=='cdrom' :
505 boot_cdrom = True
506 break
507 text += self.tab()+ '<os>' + \
508 self.inc_tab() + "<type arch='x86_64' machine='pc'>hvm</type>"
509 if boot_cdrom:
510 text += self.tab() + "<boot dev='cdrom'/>"
511 text += self.tab() + "<boot dev='hd'/>" + \
512 self.dec_tab()+'</os>'
513 #features
514 text += self.tab()+'<features>'+\
515 self.inc_tab()+'<acpi/>' +\
516 self.tab()+'<apic/>' +\
517 self.tab()+'<pae/>'+ \
518 self.dec_tab() +'</features>'
519 if topo == "oneSocket:hyperthreading":
520 if vcpus % 2 != 0:
521 return -1, 'Cannot expose hyperthreading with an odd number of vcpus'
522 text += self.tab() + "<cpu mode='host-model'> <topology sockets='1' cores='%d' threads='2' /> </cpu>" % vcpus/2
523 elif windows_os or topo == "oneSocket":
524 text += self.tab() + "<cpu mode='host-model'> <topology sockets='1' cores='%d' threads='1' /> </cpu>" % vcpus
525 else:
526 text += self.tab() + "<cpu mode='host-model'></cpu>"
527 text += self.tab() + "<clock offset='utc'/>" +\
528 self.tab() + "<on_poweroff>preserve</on_poweroff>" + \
529 self.tab() + "<on_reboot>restart</on_reboot>" + \
530 self.tab() + "<on_crash>restart</on_crash>"
531 text += self.tab() + "<devices>" + \
532 self.inc_tab() + "<emulator>/usr/libexec/qemu-kvm</emulator>" + \
533 self.tab() + "<serial type='pty'>" +\
534 self.inc_tab() + "<target port='0'/>" + \
535 self.dec_tab() + "</serial>" +\
536 self.tab() + "<console type='pty'>" + \
537 self.inc_tab()+ "<target type='serial' port='0'/>" + \
538 self.dec_tab()+'</console>'
539 if windows_os:
540 text += self.tab() + "<controller type='usb' index='0'/>" + \
541 self.tab() + "<controller type='ide' index='0'/>" + \
542 self.tab() + "<input type='mouse' bus='ps2'/>" + \
543 self.tab() + "<sound model='ich6'/>" + \
544 self.tab() + "<video>" + \
545 self.inc_tab() + "<model type='cirrus' vram='9216' heads='1'/>" + \
546 self.dec_tab() + "</video>" + \
547 self.tab() + "<memballoon model='virtio'/>" + \
548 self.tab() + "<input type='tablet' bus='usb'/>" #TODO revisar
549
550 #> self.tab()+'<alias name=\'hostdev0\'/>\n' +\
551 #> self.dec_tab()+'</hostdev>\n' +\
552 #> self.tab()+'<input type=\'tablet\' bus=\'usb\'/>\n'
553 if windows_os:
554 text += self.tab() + "<graphics type='vnc' port='-1' autoport='yes'/>"
555 else:
556 #If image contains 'GRAPH' include graphics
557 #if 'GRAPH' in image:
558 text += self.tab() + "<graphics type='vnc' port='-1' autoport='yes' listen='0.0.0.0'>" +\
559 self.inc_tab() + "<listen type='address' address='0.0.0.0'/>" +\
560 self.dec_tab() + "</graphics>"
561
562 vd_index = 'a'
563 for dev in dev_list:
564 bus_ide_dev = bus_ide
565 if dev['type']=='cdrom' or dev['type']=='disk':
566 if dev['type']=='cdrom':
567 bus_ide_dev = True
568 text += self.tab() + "<disk type='file' device='"+dev['type']+"'>"
569 if 'file format' in dev:
570 text += self.inc_tab() + "<driver name='qemu' type='" +dev['file format']+ "' cache='writethrough'/>"
571 if 'source file' in dev:
572 text += self.tab() + "<source file='" +dev['source file']+ "'/>"
573 #elif v['type'] == 'block':
574 # text += self.tab() + "<source dev='" + v['source'] + "'/>"
575 #else:
576 # return -1, 'Unknown disk type ' + v['type']
577 vpci = dev.get('vpci',None)
578 if vpci == None:
579 vpci = dev['metadata'].get('vpci',None)
580 text += self.pci2xml(vpci)
581
582 if bus_ide_dev:
583 text += self.tab() + "<target dev='hd" +vd_index+ "' bus='ide'/>" #TODO allows several type of disks
584 else:
585 text += self.tab() + "<target dev='vd" +vd_index+ "' bus='virtio'/>"
586 text += self.dec_tab() + '</disk>'
587 vd_index = chr(ord(vd_index)+1)
588 elif dev['type']=='xml':
589 dev_text = dev['xml']
590 if 'vpci' in dev:
591 dev_text = dev_text.replace('__vpci__', dev['vpci'])
592 if 'source file' in dev:
593 dev_text = dev_text.replace('__file__', dev['source file'])
594 if 'file format' in dev:
595 dev_text = dev_text.replace('__format__', dev['source file'])
596 if '__dev__' in dev_text:
597 dev_text = dev_text.replace('__dev__', vd_index)
598 vd_index = chr(ord(vd_index)+1)
599 text += dev_text
600 else:
601 return -1, 'Unknown device type ' + dev['type']
602
603 net_nb=0
604 bridge_interfaces = server.get('networks', [])
605 for v in bridge_interfaces:
606 #Get the brifge name
607 self.db_lock.acquire()
608 result, content = self.db.get_table(FROM='nets', SELECT=('provider',),WHERE={'uuid':v['net_id']} )
609 self.db_lock.release()
610 if result <= 0:
611 self.logger.error("create_xml_server ERROR %d getting nets %s", result, content)
612 return -1, content
613 #ALF: Allow by the moment the 'default' bridge net because is confortable for provide internet to VM
614 #I know it is not secure
615 #for v in sorted(desc['network interfaces'].itervalues()):
616 model = v.get("model", None)
617 if content[0]['provider']=='default':
618 text += self.tab() + "<interface type='network'>" + \
619 self.inc_tab() + "<source network='" +content[0]['provider']+ "'/>"
620 elif content[0]['provider'][0:7]=='macvtap':
621 text += self.tab()+"<interface type='direct'>" + \
622 self.inc_tab() + "<source dev='" + self.get_local_iface_name(content[0]['provider'][8:]) + "' mode='bridge'/>" + \
623 self.tab() + "<target dev='macvtap0'/>"
624 if windows_os:
625 text += self.tab() + "<alias name='net" + str(net_nb) + "'/>"
626 elif model==None:
627 model = "virtio"
628 elif content[0]['provider'][0:6]=='bridge':
629 text += self.tab() + "<interface type='bridge'>" + \
630 self.inc_tab()+"<source bridge='" +self.get_local_iface_name(content[0]['provider'][7:])+ "'/>"
631 if windows_os:
632 text += self.tab() + "<target dev='vnet" + str(net_nb)+ "'/>" +\
633 self.tab() + "<alias name='net" + str(net_nb)+ "'/>"
634 elif model==None:
635 model = "virtio"
636 elif content[0]['provider'][0:3] == "OVS":
637 vlan = content[0]['provider'].replace('OVS:', '')
638 text += self.tab() + "<interface type='bridge'>" + \
639 self.inc_tab() + "<source bridge='ovim-" + str(vlan) + "'/>"
640 else:
641 return -1, 'Unknown Bridge net provider ' + content[0]['provider']
642 if model!=None:
643 text += self.tab() + "<model type='" +model+ "'/>"
644 if v.get('mac_address', None) != None:
645 text+= self.tab() +"<mac address='" +v['mac_address']+ "'/>"
646 text += self.pci2xml(v.get('vpci',None))
647 text += self.dec_tab()+'</interface>'
648
649 net_nb += 1
650
651 interfaces = numa.get('interfaces', [])
652
653 net_nb=0
654 for v in interfaces:
655 if self.develop_mode: #map these interfaces to bridges
656 text += self.tab() + "<interface type='bridge'>" + \
657 self.inc_tab()+"<source bridge='" +self.develop_bridge_iface+ "'/>"
658 if windows_os:
659 text += self.tab() + "<target dev='vnet" + str(net_nb)+ "'/>" +\
660 self.tab() + "<alias name='net" + str(net_nb)+ "'/>"
661 else:
662 text += self.tab() + "<model type='e1000'/>" #e1000 is more probable to be supported than 'virtio'
663 if v.get('mac_address', None) != None:
664 text+= self.tab() +"<mac address='" +v['mac_address']+ "'/>"
665 text += self.pci2xml(v.get('vpci',None))
666 text += self.dec_tab()+'</interface>'
667 continue
668
669 if v['dedicated'] == 'yes': #passthrought
670 text += self.tab() + "<hostdev mode='subsystem' type='pci' managed='yes'>" + \
671 self.inc_tab() + "<source>"
672 self.inc_tab()
673 text += self.pci2xml(v['source'])
674 text += self.dec_tab()+'</source>'
675 text += self.pci2xml(v.get('vpci',None))
676 if windows_os:
677 text += self.tab() + "<alias name='hostdev" + str(net_nb) + "'/>"
678 text += self.dec_tab()+'</hostdev>'
679 net_nb += 1
680 else: #sriov_interfaces
681 #skip not connected interfaces
682 if v.get("net_id") == None:
683 continue
684 text += self.tab() + "<interface type='hostdev' managed='yes'>"
685 self.inc_tab()
686 if v.get('mac_address', None) != None:
687 text+= self.tab() + "<mac address='" +v['mac_address']+ "'/>"
688 text+= self.tab()+'<source>'
689 self.inc_tab()
690 text += self.pci2xml(v['source'])
691 text += self.dec_tab()+'</source>'
692 if v.get('vlan',None) != None:
693 text += self.tab() + "<vlan> <tag id='" + str(v['vlan']) + "'/> </vlan>"
694 text += self.pci2xml(v.get('vpci',None))
695 if windows_os:
696 text += self.tab() + "<alias name='hostdev" + str(net_nb) + "'/>"
697 text += self.dec_tab()+'</interface>'
698
699
700 text += self.dec_tab()+'</devices>'+\
701 self.dec_tab()+'</domain>'
702 return 0, text
703
704 def pci2xml(self, pci):
705 '''from a pci format text XXXX:XX:XX.X generates the xml content of <address>
706 alows an empty pci text'''
707 if pci is None:
708 return ""
709 first_part = pci.split(':')
710 second_part = first_part[2].split('.')
711 return self.tab() + "<address type='pci' domain='0x" + first_part[0] + \
712 "' bus='0x" + first_part[1] + "' slot='0x" + second_part[0] + \
713 "' function='0x" + second_part[1] + "'/>"
714
715 def tab(self):
716 """Return indentation according to xml_level"""
717 return "\n" + (' '*self.xml_level)
718
719 def inc_tab(self):
720 """Increment and return indentation according to xml_level"""
721 self.xml_level += 1
722 return self.tab()
723
724 def dec_tab(self):
725 """Decrement and return indentation according to xml_level"""
726 self.xml_level -= 1
727 return self.tab()
728
729 def create_ovs_bridge(self):
730 """
731 Create a bridge in compute OVS to allocate VMs
732 :return: True if success
733 """
734 if self.test:
735 return True
736 try:
737 command = 'sudo ovs-vsctl --may-exist add-br br-int -- set Bridge br-int stp_enable=true'
738 self.logger.debug("command: " + command)
739 (_, stdout, _) = self.ssh_conn.exec_command(command)
740 content = stdout.read()
741 if len(content) == 0:
742 return True
743 else:
744 return False
745 except paramiko.ssh_exception.SSHException as e:
746 self.logger.error("create_ovs_bridge ssh Exception: " + str(e))
747 if "SSH session not active" in str(e):
748 self.ssh_connect()
749 return False
750
751 def delete_port_to_ovs_bridge(self, vlan, net_uuid):
752 """
753 Delete linux bridge port attched to a OVS bridge, if port is not free the port is not removed
754 :param vlan: vlan port id
755 :param net_uuid: network id
756 :return:
757 """
758
759 if self.test:
760 return True
761 try:
762 port_name = 'ovim-' + str(vlan)
763 command = 'sudo ovs-vsctl del-port br-int ' + port_name
764 self.logger.debug("command: " + command)
765 (_, stdout, _) = self.ssh_conn.exec_command(command)
766 content = stdout.read()
767 if len(content) == 0:
768 return True
769 else:
770 return False
771 except paramiko.ssh_exception.SSHException as e:
772 self.logger.error("delete_port_to_ovs_bridge ssh Exception: " + str(e))
773 if "SSH session not active" in str(e):
774 self.ssh_connect()
775 return False
776
777 def delete_dhcp_server(self, vlan, net_uuid, dhcp_path):
778 """
779 Delete dhcp server process lining in namespace
780 :param vlan: segmentation id
781 :param net_uuid: network uuid
782 :param dhcp_path: conf fiel path that live in namespace side
783 :return:
784 """
785 if self.test:
786 return True
787 if not self.is_dhcp_port_free(vlan, net_uuid):
788 return True
789 try:
790 net_namespace = 'ovim-' + str(vlan)
791 dhcp_path = os.path.join(dhcp_path, net_namespace)
792 pid_file = os.path.join(dhcp_path, 'dnsmasq.pid')
793
794 command = 'sudo ip netns exec ' + net_namespace + ' cat ' + pid_file
795 self.logger.debug("command: " + command)
796 (_, stdout, _) = self.ssh_conn.exec_command(command)
797 content = stdout.read()
798
799 command = 'sudo ip netns exec ' + net_namespace + ' kill -9 ' + content
800 self.logger.debug("command: " + command)
801 (_, stdout, _) = self.ssh_conn.exec_command(command)
802 content = stdout.read()
803
804 # if len(content) == 0:
805 # return True
806 # else:
807 # return False
808 except paramiko.ssh_exception.SSHException as e:
809 self.logger.error("delete_dhcp_server ssh Exception: " + str(e))
810 if "SSH session not active" in str(e):
811 self.ssh_connect()
812 return False
813
814 def is_dhcp_port_free(self, host_id, net_uuid):
815 """
816 Check if any port attached to the a net in a vxlan mesh across computes nodes
817 :param host_id: host id
818 :param net_uuid: network id
819 :return: True if is not free
820 """
821 self.db_lock.acquire()
822 result, content = self.db.get_table(
823 FROM='ports',
824 WHERE={'type': 'instance:ovs', 'net_id': net_uuid}
825 )
826 self.db_lock.release()
827
828 if len(content) > 0:
829 return False
830 else:
831 return True
832
833 def is_port_free(self, host_id, net_uuid):
834 """
835 Check if there not ovs ports of a network in a compute host.
836 :param host_id: host id
837 :param net_uuid: network id
838 :return: True if is not free
839 """
840
841 self.db_lock.acquire()
842 result, content = self.db.get_table(
843 FROM='ports as p join instances as i on p.instance_id=i.uuid',
844 WHERE={"i.host_id": self.host_id, 'p.type': 'instance:ovs', 'p.net_id': net_uuid}
845 )
846 self.db_lock.release()
847
848 if len(content) > 0:
849 return False
850 else:
851 return True
852
853 def add_port_to_ovs_bridge(self, vlan):
854 """
855 Add a bridge linux as a port to a OVS bridge and set a vlan for an specific linux bridge
856 :param vlan: vlan port id
857 :return: True if success
858 """
859
860 if self.test:
861 return True
862 try:
863 port_name = 'ovim-' + str(vlan)
864 command = 'sudo ovs-vsctl add-port br-int ' + port_name + ' tag=' + str(vlan)
865 self.logger.debug("command: " + command)
866 (_, stdout, _) = self.ssh_conn.exec_command(command)
867 content = stdout.read()
868 if len(content) == 0:
869 return True
870 else:
871 return False
872 except paramiko.ssh_exception.SSHException as e:
873 self.logger.error("add_port_to_ovs_bridge ssh Exception: " + str(e))
874 if "SSH session not active" in str(e):
875 self.ssh_connect()
876 return False
877
878 def delete_dhcp_port(self, vlan, net_uuid):
879 """
880 Delete from an existing OVS bridge a linux bridge port attached and the linux bridge itself.
881 :param vlan: segmentation id
882 :param net_uuid: network id
883 :return: True if success
884 """
885
886 if self.test:
887 return True
888
889 if not self.is_dhcp_port_free(vlan, net_uuid):
890 return True
891 self.delete_dhcp_interfaces(vlan)
892 return True
893
894 def delete_bridge_port_attached_to_ovs(self, vlan, net_uuid):
895 """
896 Delete from an existing OVS bridge a linux bridge port attached and the linux bridge itself.
897 :param vlan:
898 :param net_uuid:
899 :return: True if success
900 """
901 if self.test:
902 return
903
904 if not self.is_port_free(vlan, net_uuid):
905 return True
906 self.delete_port_to_ovs_bridge(vlan, net_uuid)
907 self.delete_linux_bridge(vlan)
908 return True
909
910 def delete_linux_bridge(self, vlan):
911 """
912 Delete a linux bridge in a scpecific compute.
913 :param vlan: vlan port id
914 :return: True if success
915 """
916
917 if self.test:
918 return True
919 try:
920 port_name = 'ovim-' + str(vlan)
921 command = 'sudo ip link set dev veth0-' + str(vlan) + ' down'
922 self.logger.debug("command: " + command)
923 (_, stdout, _) = self.ssh_conn.exec_command(command)
924 # content = stdout.read()
925 #
926 # if len(content) != 0:
927 # return False
928 command = 'sudo ifconfig ' + port_name + ' down && sudo brctl delbr ' + port_name
929 self.logger.debug("command: " + command)
930 (_, stdout, _) = self.ssh_conn.exec_command(command)
931 content = stdout.read()
932 if len(content) == 0:
933 return True
934 else:
935 return False
936 except paramiko.ssh_exception.SSHException as e:
937 self.logger.error("delete_linux_bridge ssh Exception: " + str(e))
938 if "SSH session not active" in str(e):
939 self.ssh_connect()
940 return False
941
942 def create_ovs_bridge_port(self, vlan):
943 """
944 Generate a linux bridge and attache the port to a OVS bridge
945 :param vlan: vlan port id
946 :return:
947 """
948 if self.test:
949 return
950 self.create_linux_bridge(vlan)
951 self.add_port_to_ovs_bridge(vlan)
952
953 def create_linux_bridge(self, vlan):
954 """
955 Create a linux bridge with STP active
956 :param vlan: netowrk vlan id
957 :return:
958 """
959
960 if self.test:
961 return True
962 try:
963 port_name = 'ovim-' + str(vlan)
964 command = 'sudo brctl show | grep ' + port_name
965 self.logger.debug("command: " + command)
966 (_, stdout, _) = self.ssh_conn.exec_command(command)
967 content = stdout.read()
968
969 # if exist nothing to create
970 # if len(content) == 0:
971 # return False
972
973 command = 'sudo brctl addbr ' + port_name
974 self.logger.debug("command: " + command)
975 (_, stdout, _) = self.ssh_conn.exec_command(command)
976 content = stdout.read()
977
978 # if len(content) == 0:
979 # return True
980 # else:
981 # return False
982
983 command = 'sudo brctl stp ' + port_name + ' on'
984 self.logger.debug("command: " + command)
985 (_, stdout, _) = self.ssh_conn.exec_command(command)
986 content = stdout.read()
987
988 # if len(content) == 0:
989 # return True
990 # else:
991 # return False
992 command = 'sudo ip link set dev ' + port_name + ' up'
993 self.logger.debug("command: " + command)
994 (_, stdout, _) = self.ssh_conn.exec_command(command)
995 content = stdout.read()
996
997 if len(content) == 0:
998 return True
999 else:
1000 return False
1001 except paramiko.ssh_exception.SSHException as e:
1002 self.logger.error("create_linux_bridge ssh Exception: " + str(e))
1003 if "SSH session not active" in str(e):
1004 self.ssh_connect()
1005 return False
1006
1007 def set_mac_dhcp_server(self, ip, mac, vlan, netmask, dhcp_path):
1008 """
1009 Write into dhcp conf file a rule to assigned a fixed ip given to an specific MAC address
1010 :param ip: IP address asigned to a VM
1011 :param mac: VM vnic mac to be macthed with the IP received
1012 :param vlan: Segmentation id
1013 :param netmask: netmask value
1014 :param path: dhcp conf file path that live in namespace side
1015 :return: True if success
1016 """
1017
1018 if self.test:
1019 return True
1020
1021 net_namespace = 'ovim-' + str(vlan)
1022 dhcp_path = os.path.join(dhcp_path, net_namespace)
1023 dhcp_hostsdir = os.path.join(dhcp_path, net_namespace)
1024
1025 if not ip:
1026 return False
1027 try:
1028 ip_data = mac.upper() + ',' + ip
1029
1030 command = 'sudo ip netns exec ' + net_namespace + ' touch ' + dhcp_hostsdir
1031 self.logger.debug("command: " + command)
1032 (_, stdout, _) = self.ssh_conn.exec_command(command)
1033 content = stdout.read()
1034
1035 command = 'sudo ip netns exec ' + net_namespace + ' sudo bash -ec "echo ' + ip_data + ' >> ' + dhcp_hostsdir + '"'
1036
1037 self.logger.debug("command: " + command)
1038 (_, stdout, _) = self.ssh_conn.exec_command(command)
1039 content = stdout.read()
1040
1041 if len(content) == 0:
1042 return True
1043 else:
1044 return False
1045 except paramiko.ssh_exception.SSHException as e:
1046 self.logger.error("set_mac_dhcp_server ssh Exception: " + str(e))
1047 if "SSH session not active" in str(e):
1048 self.ssh_connect()
1049 return False
1050
1051 def delete_mac_dhcp_server(self, ip, mac, vlan, dhcp_path):
1052 """
1053 Delete into dhcp conf file the ip assigned to a specific MAC address
1054
1055 :param ip: IP address asigned to a VM
1056 :param mac: VM vnic mac to be macthed with the IP received
1057 :param vlan: Segmentation id
1058 :param dhcp_path: dhcp conf file path that live in namespace side
1059 :return:
1060 """
1061
1062 if self.test:
1063 return False
1064 try:
1065 net_namespace = 'ovim-' + str(vlan)
1066 dhcp_path = os.path.join(dhcp_path, net_namespace)
1067 dhcp_hostsdir = os.path.join(dhcp_path, net_namespace)
1068
1069 if not ip:
1070 return False
1071
1072 ip_data = mac.upper() + ',' + ip
1073
1074 command = 'sudo ip netns exec ' + net_namespace + ' sudo sed -i \'/' + ip_data + '/d\' ' + dhcp_hostsdir
1075 self.logger.debug("command: " + command)
1076 (_, stdout, _) = self.ssh_conn.exec_command(command)
1077 content = stdout.read()
1078
1079 if len(content) == 0:
1080 return True
1081 else:
1082 return False
1083
1084 except paramiko.ssh_exception.SSHException as e:
1085 self.logger.error("set_mac_dhcp_server ssh Exception: " + str(e))
1086 if "SSH session not active" in str(e):
1087 self.ssh_connect()
1088 return False
1089
1090 def launch_dhcp_server(self, vlan, ip_range, netmask, dhcp_path, gateway):
1091 """
1092 Generate a linux bridge and attache the port to a OVS bridge
1093 :param self:
1094 :param vlan: Segmentation id
1095 :param ip_range: IP dhcp range
1096 :param netmask: network netmask
1097 :param dhcp_path: dhcp conf file path that live in namespace side
1098 :param gateway: Gateway address for dhcp net
1099 :return: True if success
1100 """
1101
1102 if self.test:
1103 return True
1104 try:
1105 interface = 'tap-' + str(vlan)
1106 net_namespace = 'ovim-' + str(vlan)
1107 dhcp_path = os.path.join(dhcp_path, net_namespace)
1108 leases_path = os.path.join(dhcp_path, "dnsmasq.leases")
1109 pid_file = os.path.join(dhcp_path, 'dnsmasq.pid')
1110
1111 dhcp_range = ip_range[0] + ',' + ip_range[1] + ',' + netmask
1112
1113 command = 'sudo ip netns exec ' + net_namespace + ' mkdir -p ' + dhcp_path
1114 self.logger.debug("command: " + command)
1115 (_, stdout, _) = self.ssh_conn.exec_command(command)
1116 content = stdout.read()
1117
1118 pid_path = os.path.join(dhcp_path, 'dnsmasq.pid')
1119 command = 'sudo ip netns exec ' + net_namespace + ' cat ' + pid_path
1120 self.logger.debug("command: " + command)
1121 (_, stdout, _) = self.ssh_conn.exec_command(command)
1122 content = stdout.read()
1123 # check if pid is runing
1124 pid_status_path = content
1125 if content:
1126 command = "ps aux | awk '{print $2 }' | grep " + pid_status_path
1127 self.logger.debug("command: " + command)
1128 (_, stdout, _) = self.ssh_conn.exec_command(command)
1129 content = stdout.read()
1130 if not content:
1131 command = 'sudo ip netns exec ' + net_namespace + ' /usr/sbin/dnsmasq --strict-order --except-interface=lo ' \
1132 '--interface=' + interface + ' --bind-interfaces --dhcp-hostsdir=' + dhcp_path + \
1133 ' --dhcp-range ' + dhcp_range + ' --pid-file=' + pid_file + ' --dhcp-leasefile=' + leases_path + \
1134 ' --listen-address ' + gateway
1135
1136 self.logger.debug("command: " + command)
1137 (_, stdout, _) = self.ssh_conn.exec_command(command)
1138 content = stdout.readline()
1139
1140 if len(content) == 0:
1141 return True
1142 else:
1143 return False
1144 except paramiko.ssh_exception.SSHException as e:
1145 self.logger.error("launch_dhcp_server ssh Exception: " + str(e))
1146 if "SSH session not active" in str(e):
1147 self.ssh_connect()
1148 return False
1149
1150 def delete_dhcp_interfaces(self, vlan):
1151 """
1152 Create a linux bridge with STP active
1153 :param vlan: netowrk vlan id
1154 :return:
1155 """
1156
1157 if self.test:
1158 return True
1159 try:
1160 net_namespace = 'ovim-' + str(vlan)
1161 command = 'sudo ovs-vsctl del-port br-int ovs-tap-' + str(vlan)
1162 self.logger.debug("command: " + command)
1163 (_, stdout, _) = self.ssh_conn.exec_command(command)
1164 content = stdout.read()
1165
1166 command = 'sudo ip netns exec ' + net_namespace + ' ip link set dev tap-' + str(vlan) + ' down'
1167 self.logger.debug("command: " + command)
1168 (_, stdout, _) = self.ssh_conn.exec_command(command)
1169 content = stdout.read()
1170
1171 command = 'sudo ip link set dev ovs-tap-' + str(vlan) + ' down'
1172 self.logger.debug("command: " + command)
1173 (_, stdout, _) = self.ssh_conn.exec_command(command)
1174 content = stdout.read()
1175 except paramiko.ssh_exception.SSHException as e:
1176 self.logger.error("delete_dhcp_interfaces ssh Exception: " + str(e))
1177 if "SSH session not active" in str(e):
1178 self.ssh_connect()
1179 return False
1180
1181 def create_dhcp_interfaces(self, vlan, ip_listen_address, netmask):
1182 """
1183 Create a linux bridge with STP active
1184 :param vlan: segmentation id
1185 :param ip_listen_address: Listen Ip address for the dhcp service, the tap interface living in namesapce side
1186 :param netmask: dhcp net CIDR
1187 :return: True if success
1188 """
1189
1190 if self.test:
1191 return True
1192 try:
1193 net_namespace = 'ovim-' + str(vlan)
1194 namespace_interface = 'tap-' + str(vlan)
1195
1196 command = 'sudo ip netns add ' + net_namespace
1197 self.logger.debug("command: " + command)
1198 (_, stdout, _) = self.ssh_conn.exec_command(command)
1199 content = stdout.read()
1200
1201 command = 'sudo ip link add tap-' + str(vlan) + ' type veth peer name ovs-tap-' + str(vlan)
1202 self.logger.debug("command: " + command)
1203 (_, stdout, _) = self.ssh_conn.exec_command(command)
1204 content = stdout.read()
1205
1206 command = 'sudo ovs-vsctl add-port br-int ovs-tap-' + str(vlan) + ' tag=' + str(vlan)
1207 self.logger.debug("command: " + command)
1208 (_, stdout, _) = self.ssh_conn.exec_command(command)
1209 content = stdout.read()
1210
1211 command = 'sudo ip link set tap-' + str(vlan) + ' netns ' + net_namespace
1212 self.logger.debug("command: " + command)
1213 (_, stdout, _) = self.ssh_conn.exec_command(command)
1214 content = stdout.read()
1215
1216 command = 'sudo ip netns exec ' + net_namespace + ' ip link set dev tap-' + str(vlan) + ' up'
1217 self.logger.debug("command: " + command)
1218 (_, stdout, _) = self.ssh_conn.exec_command(command)
1219 content = stdout.read()
1220
1221 command = 'sudo ip link set dev ovs-tap-' + str(vlan) + ' up'
1222 self.logger.debug("command: " + command)
1223 (_, stdout, _) = self.ssh_conn.exec_command(command)
1224 content = stdout.read()
1225
1226 command = 'sudo ip netns exec ' + net_namespace + ' ip link set dev lo up'
1227 self.logger.debug("command: " + command)
1228 (_, stdout, _) = self.ssh_conn.exec_command(command)
1229 content = stdout.read()
1230
1231 command = 'sudo ip netns exec ' + net_namespace + ' ' + ' ifconfig ' + namespace_interface \
1232 + ' ' + ip_listen_address + ' netmask ' + netmask
1233 self.logger.debug("command: " + command)
1234 (_, stdout, _) = self.ssh_conn.exec_command(command)
1235 content = stdout.read()
1236
1237 if len(content) == 0:
1238 return True
1239 else:
1240 return False
1241 except paramiko.ssh_exception.SSHException as e:
1242 self.logger.error("create_dhcp_interfaces ssh Exception: " + str(e))
1243 if "SSH session not active" in str(e):
1244 self.ssh_connect()
1245 return False
1246
1247
1248 def create_ovs_vxlan_tunnel(self, vxlan_interface, remote_ip):
1249 """
1250 Create a vlxn tunnel between to computes with an OVS installed. STP is also active at port level
1251 :param vxlan_interface: vlxan inteface name.
1252 :param remote_ip: tunnel endpoint remote compute ip.
1253 :return:
1254 """
1255 if self.test:
1256 return True
1257 try:
1258 command = 'sudo ovs-vsctl add-port br-int ' + vxlan_interface + \
1259 ' -- set Interface ' + vxlan_interface + ' type=vxlan options:remote_ip=' + remote_ip + \
1260 ' -- set Port ' + vxlan_interface + ' other_config:stp-path-cost=10'
1261 self.logger.debug("command: " + command)
1262 (_, stdout, _) = self.ssh_conn.exec_command(command)
1263 content = stdout.read()
1264 # print content
1265 if len(content) == 0:
1266 return True
1267 else:
1268 return False
1269 except paramiko.ssh_exception.SSHException as e:
1270 self.logger.error("create_ovs_vxlan_tunnel ssh Exception: " + str(e))
1271 if "SSH session not active" in str(e):
1272 self.ssh_connect()
1273 return False
1274
1275 def delete_ovs_vxlan_tunnel(self, vxlan_interface):
1276 """
1277 Delete a vlxan tunnel port from a OVS brdige.
1278 :param vxlan_interface: vlxan name to be delete it.
1279 :return: True if success.
1280 """
1281 if self.test:
1282 return True
1283 try:
1284 command = 'sudo ovs-vsctl del-port br-int ' + vxlan_interface
1285 self.logger.debug("command: " + command)
1286 (_, stdout, _) = self.ssh_conn.exec_command(command)
1287 content = stdout.read()
1288 # print content
1289 if len(content) == 0:
1290 return True
1291 else:
1292 return False
1293 except paramiko.ssh_exception.SSHException as e:
1294 self.logger.error("delete_ovs_vxlan_tunnel ssh Exception: " + str(e))
1295 if "SSH session not active" in str(e):
1296 self.ssh_connect()
1297 return False
1298
1299 def delete_ovs_bridge(self):
1300 """
1301 Delete a OVS bridge from a compute.
1302 :return: True if success
1303 """
1304 if self.test:
1305 return True
1306 try:
1307 command = 'sudo ovs-vsctl del-br br-int'
1308 self.logger.debug("command: " + command)
1309 (_, stdout, _) = self.ssh_conn.exec_command(command)
1310 content = stdout.read()
1311 if len(content) == 0:
1312 return True
1313 else:
1314 return False
1315 except paramiko.ssh_exception.SSHException as e:
1316 self.logger.error("delete_ovs_bridge ssh Exception: " + str(e))
1317 if "SSH session not active" in str(e):
1318 self.ssh_connect()
1319 return False
1320
1321 def get_file_info(self, path):
1322 command = 'ls -lL --time-style=+%Y-%m-%dT%H:%M:%S ' + path
1323 self.logger.debug("command: " + command)
1324 (_, stdout, _) = self.ssh_conn.exec_command(command)
1325 content = stdout.read()
1326 if len(content) == 0:
1327 return None # file does not exist
1328 else:
1329 return content.split(" ") # (permission, 1, owner, group, size, date, file)
1330
1331 def qemu_get_info(self, path):
1332 command = 'qemu-img info ' + path
1333 self.logger.debug("command: " + command)
1334 (_, stdout, stderr) = self.ssh_conn.exec_command(command)
1335 content = stdout.read()
1336 if len(content) == 0:
1337 error = stderr.read()
1338 self.logger.error("get_qemu_info error " + error)
1339 raise paramiko.ssh_exception.SSHException("Error getting qemu_info: " + error)
1340 else:
1341 try:
1342 return yaml.load(content)
1343 except yaml.YAMLError as exc:
1344 text = ""
1345 if hasattr(exc, 'problem_mark'):
1346 mark = exc.problem_mark
1347 text = " at position: (%s:%s)" % (mark.line+1, mark.column+1)
1348 self.logger.error("get_qemu_info yaml format Exception " + text)
1349 raise paramiko.ssh_exception.SSHException("Error getting qemu_info yaml format" + text)
1350
1351 def qemu_change_backing(self, inc_file, new_backing_file):
1352 command = 'qemu-img rebase -u -b ' + new_backing_file + ' ' + inc_file
1353 self.logger.debug("command: " + command)
1354 (_, _, stderr) = self.ssh_conn.exec_command(command)
1355 content = stderr.read()
1356 if len(content) == 0:
1357 return 0
1358 else:
1359 self.logger.error("qemu_change_backing error: " + content)
1360 return -1
1361
1362 def get_notused_filename(self, proposed_name, suffix=''):
1363 '''Look for a non existing file_name in the host
1364 proposed_name: proposed file name, includes path
1365 suffix: suffix to be added to the name, before the extention
1366 '''
1367 extension = proposed_name.rfind(".")
1368 slash = proposed_name.rfind("/")
1369 if extension < 0 or extension < slash: # no extension
1370 extension = len(proposed_name)
1371 target_name = proposed_name[:extension] + suffix + proposed_name[extension:]
1372 info = self.get_file_info(target_name)
1373 if info is None:
1374 return target_name
1375
1376 index=0
1377 while info is not None:
1378 target_name = proposed_name[:extension] + suffix + "-" + str(index) + proposed_name[extension:]
1379 index+=1
1380 info = self.get_file_info(target_name)
1381 return target_name
1382
1383 def get_notused_path(self, proposed_path, suffix=''):
1384 '''Look for a non existing path at database for images
1385 proposed_path: proposed file name, includes path
1386 suffix: suffix to be added to the name, before the extention
1387 '''
1388 extension = proposed_path.rfind(".")
1389 if extension < 0:
1390 extension = len(proposed_path)
1391 if suffix != None:
1392 target_path = proposed_path[:extension] + suffix + proposed_path[extension:]
1393 index=0
1394 while True:
1395 r,_=self.db.get_table(FROM="images",WHERE={"path":target_path})
1396 if r<=0:
1397 return target_path
1398 target_path = proposed_path[:extension] + suffix + "-" + str(index) + proposed_path[extension:]
1399 index+=1
1400
1401
1402 def delete_file(self, file_name):
1403 command = 'rm -f '+file_name
1404 self.logger.debug("command: " + command)
1405 (_, _, stderr) = self.ssh_conn.exec_command(command)
1406 error_msg = stderr.read()
1407 if len(error_msg) > 0:
1408 raise paramiko.ssh_exception.SSHException("Error deleting file: " + error_msg)
1409
1410 def copy_file(self, source, destination, perserve_time=True):
1411 if source[0:4]=="http":
1412 command = "wget --no-verbose -O '{dst}' '{src}' 2>'{dst_result}' || cat '{dst_result}' >&2 && rm '{dst_result}'".format(
1413 dst=destination, src=source, dst_result=destination + ".result" )
1414 else:
1415 command = 'cp --no-preserve=mode'
1416 if perserve_time:
1417 command += ' --preserve=timestamps'
1418 command += " '{}' '{}'".format(source, destination)
1419 self.logger.debug("command: " + command)
1420 (_, _, stderr) = self.ssh_conn.exec_command(command)
1421 error_msg = stderr.read()
1422 if len(error_msg) > 0:
1423 raise paramiko.ssh_exception.SSHException("Error copying image to local host: " + error_msg)
1424
1425 def copy_remote_file(self, remote_file, use_incremental):
1426 ''' Copy a file from the repository to local folder and recursively
1427 copy the backing files in case the remote file is incremental
1428 Read and/or modified self.localinfo['files'] that contain the
1429 unmodified copies of images in the local path
1430 params:
1431 remote_file: path of remote file
1432 use_incremental: None (leave the decision to this function), True, False
1433 return:
1434 local_file: name of local file
1435 qemu_info: dict with quemu information of local file
1436 use_incremental_out: True, False; same as use_incremental, but if None a decision is taken
1437 '''
1438
1439 use_incremental_out = use_incremental
1440 new_backing_file = None
1441 local_file = None
1442 file_from_local = True
1443
1444 #in case incremental use is not decided, take the decision depending on the image
1445 #avoid the use of incremental if this image is already incremental
1446 if remote_file[0:4] == "http":
1447 file_from_local = False
1448 if file_from_local:
1449 qemu_remote_info = self.qemu_get_info(remote_file)
1450 if use_incremental_out==None:
1451 use_incremental_out = not ( file_from_local and 'backing file' in qemu_remote_info)
1452 #copy recursivelly the backing files
1453 if file_from_local and 'backing file' in qemu_remote_info:
1454 new_backing_file, _, _ = self.copy_remote_file(qemu_remote_info['backing file'], True)
1455
1456 #check if remote file is present locally
1457 if use_incremental_out and remote_file in self.localinfo['files']:
1458 local_file = self.localinfo['files'][remote_file]
1459 local_file_info = self.get_file_info(local_file)
1460 if file_from_local:
1461 remote_file_info = self.get_file_info(remote_file)
1462 if local_file_info == None:
1463 local_file = None
1464 elif file_from_local and (local_file_info[4]!=remote_file_info[4] or local_file_info[5]!=remote_file_info[5]):
1465 #local copy of file not valid because date or size are different.
1466 #TODO DELETE local file if this file is not used by any active virtual machine
1467 try:
1468 self.delete_file(local_file)
1469 del self.localinfo['files'][remote_file]
1470 except Exception:
1471 pass
1472 local_file = None
1473 else: #check that the local file has the same backing file, or there are not backing at all
1474 qemu_info = self.qemu_get_info(local_file)
1475 if new_backing_file != qemu_info.get('backing file'):
1476 local_file = None
1477
1478
1479 if local_file == None: #copy the file
1480 img_name= remote_file.split('/') [-1]
1481 img_local = self.image_path + '/' + img_name
1482 local_file = self.get_notused_filename(img_local)
1483 self.copy_file(remote_file, local_file, use_incremental_out)
1484
1485 if use_incremental_out:
1486 self.localinfo['files'][remote_file] = local_file
1487 if new_backing_file:
1488 self.qemu_change_backing(local_file, new_backing_file)
1489 qemu_info = self.qemu_get_info(local_file)
1490
1491 return local_file, qemu_info, use_incremental_out
1492
1493 def launch_server(self, conn, server, rebuild=False, domain=None):
1494 if self.test:
1495 time.sleep(random.randint(20,150)) #sleep random timeto be make it a bit more real
1496 return 0, 'Success'
1497
1498 server_id = server['uuid']
1499 paused = server.get('paused','no')
1500 try:
1501 if domain!=None and rebuild==False:
1502 domain.resume()
1503 #self.server_status[server_id] = 'ACTIVE'
1504 return 0, 'Success'
1505
1506 self.db_lock.acquire()
1507 result, server_data = self.db.get_instance(server_id)
1508 self.db_lock.release()
1509 if result <= 0:
1510 self.logger.error("launch_server ERROR getting server from DB %d %s", result, server_data)
1511 return result, server_data
1512
1513 #0: get image metadata
1514 server_metadata = server.get('metadata', {})
1515 use_incremental = None
1516
1517 if "use_incremental" in server_metadata:
1518 use_incremental = False if server_metadata["use_incremental"] == "no" else True
1519
1520 server_host_files = self.localinfo['server_files'].get( server['uuid'], {})
1521 if rebuild:
1522 #delete previous incremental files
1523 for file_ in server_host_files.values():
1524 self.delete_file(file_['source file'] )
1525 server_host_files={}
1526
1527 #1: obtain aditional devices (disks)
1528 #Put as first device the main disk
1529 devices = [ {"type":"disk", "image_id":server['image_id'], "vpci":server_metadata.get('vpci', None) } ]
1530 if 'extended' in server_data and server_data['extended']!=None and "devices" in server_data['extended']:
1531 devices += server_data['extended']['devices']
1532
1533 for dev in devices:
1534 if dev['image_id'] == None:
1535 continue
1536
1537 self.db_lock.acquire()
1538 result, content = self.db.get_table(FROM='images', SELECT=('path', 'metadata'),
1539 WHERE={'uuid': dev['image_id']})
1540 self.db_lock.release()
1541 if result <= 0:
1542 error_text = "ERROR", result, content, "when getting image", dev['image_id']
1543 self.logger.error("launch_server " + error_text)
1544 return -1, error_text
1545 if content[0]['metadata'] is not None:
1546 dev['metadata'] = json.loads(content[0]['metadata'])
1547 else:
1548 dev['metadata'] = {}
1549
1550 if dev['image_id'] in server_host_files:
1551 dev['source file'] = server_host_files[ dev['image_id'] ] ['source file'] #local path
1552 dev['file format'] = server_host_files[ dev['image_id'] ] ['file format'] # raw or qcow2
1553 continue
1554
1555 #2: copy image to host
1556 remote_file = content[0]['path']
1557 use_incremental_image = use_incremental
1558 if dev['metadata'].get("use_incremental") == "no":
1559 use_incremental_image = False
1560 local_file, qemu_info, use_incremental_image = self.copy_remote_file(remote_file, use_incremental_image)
1561
1562 #create incremental image
1563 if use_incremental_image:
1564 local_file_inc = self.get_notused_filename(local_file, '.inc')
1565 command = 'qemu-img create -f qcow2 '+local_file_inc+ ' -o backing_file='+ local_file
1566 self.logger.debug("command: " + command)
1567 (_, _, stderr) = self.ssh_conn.exec_command(command)
1568 error_msg = stderr.read()
1569 if len(error_msg) > 0:
1570 raise paramiko.ssh_exception.SSHException("Error creating incremental file: " + error_msg)
1571 local_file = local_file_inc
1572 qemu_info = {'file format':'qcow2'}
1573
1574 server_host_files[ dev['image_id'] ] = {'source file': local_file, 'file format': qemu_info['file format']}
1575
1576 dev['source file'] = local_file
1577 dev['file format'] = qemu_info['file format']
1578
1579 self.localinfo['server_files'][ server['uuid'] ] = server_host_files
1580 self.localinfo_dirty = True
1581
1582 #3 Create XML
1583 result, xml = self.create_xml_server(server_data, devices, server_metadata) #local_file
1584 if result <0:
1585 self.logger.error("create xml server error: " + xml)
1586 return -2, xml
1587 self.logger.debug("create xml: " + xml)
1588 atribute = host_thread.lvirt_module.VIR_DOMAIN_START_PAUSED if paused == "yes" else 0
1589 #4 Start the domain
1590 if not rebuild: #ensures that any pending destroying server is done
1591 self.server_forceoff(True)
1592 #self.logger.debug("launching instance " + xml)
1593 conn.createXML(xml, atribute)
1594 #self.server_status[server_id] = 'PAUSED' if paused == "yes" else 'ACTIVE'
1595
1596 return 0, 'Success'
1597
1598 except paramiko.ssh_exception.SSHException as e:
1599 text = e.args[0]
1600 self.logger.error("launch_server id='%s' ssh Exception: %s", server_id, text)
1601 if "SSH session not active" in text:
1602 self.ssh_connect()
1603 except host_thread.lvirt_module.libvirtError as e:
1604 text = e.get_error_message()
1605 self.logger.error("launch_server id='%s' libvirt Exception: %s", server_id, text)
1606 except Exception as e:
1607 text = str(e)
1608 self.logger.error("launch_server id='%s' Exception: %s", server_id, text)
1609 return -1, text
1610
1611 def update_servers_status(self):
1612 # # virDomainState
1613 # VIR_DOMAIN_NOSTATE = 0
1614 # VIR_DOMAIN_RUNNING = 1
1615 # VIR_DOMAIN_BLOCKED = 2
1616 # VIR_DOMAIN_PAUSED = 3
1617 # VIR_DOMAIN_SHUTDOWN = 4
1618 # VIR_DOMAIN_SHUTOFF = 5
1619 # VIR_DOMAIN_CRASHED = 6
1620 # VIR_DOMAIN_PMSUSPENDED = 7 #TODO suspended
1621
1622 if self.test or len(self.server_status)==0:
1623 return
1624
1625 try:
1626 conn = host_thread.lvirt_module.open(self.lvirt_conn_uri)
1627 domains= conn.listAllDomains()
1628 domain_dict={}
1629 for domain in domains:
1630 uuid = domain.UUIDString() ;
1631 libvirt_status = domain.state()
1632 #print libvirt_status
1633 if libvirt_status[0] == host_thread.lvirt_module.VIR_DOMAIN_RUNNING or libvirt_status[0] == host_thread.lvirt_module.VIR_DOMAIN_SHUTDOWN:
1634 new_status = "ACTIVE"
1635 elif libvirt_status[0] == host_thread.lvirt_module.VIR_DOMAIN_PAUSED:
1636 new_status = "PAUSED"
1637 elif libvirt_status[0] == host_thread.lvirt_module.VIR_DOMAIN_SHUTOFF:
1638 new_status = "INACTIVE"
1639 elif libvirt_status[0] == host_thread.lvirt_module.VIR_DOMAIN_CRASHED:
1640 new_status = "ERROR"
1641 else:
1642 new_status = None
1643 domain_dict[uuid] = new_status
1644 conn.close()
1645 except host_thread.lvirt_module.libvirtError as e:
1646 self.logger.error("get_state() Exception " + e.get_error_message())
1647 return
1648
1649 for server_id, current_status in self.server_status.iteritems():
1650 new_status = None
1651 if server_id in domain_dict:
1652 new_status = domain_dict[server_id]
1653 else:
1654 new_status = "INACTIVE"
1655
1656 if new_status == None or new_status == current_status:
1657 continue
1658 if new_status == 'INACTIVE' and current_status == 'ERROR':
1659 continue #keep ERROR status, because obviously this machine is not running
1660 #change status
1661 self.logger.debug("server id='%s' status change from '%s' to '%s'", server_id, current_status, new_status)
1662 STATUS={'progress':100, 'status':new_status}
1663 if new_status == 'ERROR':
1664 STATUS['last_error'] = 'machine has crashed'
1665 self.db_lock.acquire()
1666 r,_ = self.db.update_rows('instances', STATUS, {'uuid':server_id}, log=False)
1667 self.db_lock.release()
1668 if r>=0:
1669 self.server_status[server_id] = new_status
1670
1671 def action_on_server(self, req, last_retry=True):
1672 '''Perform an action on a req
1673 Attributes:
1674 req: dictionary that contain:
1675 server properties: 'uuid','name','tenant_id','status'
1676 action: 'action'
1677 host properties: 'user', 'ip_name'
1678 return (error, text)
1679 0: No error. VM is updated to new state,
1680 -1: Invalid action, as trying to pause a PAUSED VM
1681 -2: Error accessing host
1682 -3: VM nor present
1683 -4: Error at DB access
1684 -5: Error while trying to perform action. VM is updated to ERROR
1685 '''
1686 server_id = req['uuid']
1687 conn = None
1688 new_status = None
1689 old_status = req['status']
1690 last_error = None
1691
1692 if self.test:
1693 if 'terminate' in req['action']:
1694 new_status = 'deleted'
1695 elif 'shutoff' in req['action'] or 'shutdown' in req['action'] or 'forceOff' in req['action']:
1696 if req['status']!='ERROR':
1697 time.sleep(5)
1698 new_status = 'INACTIVE'
1699 elif 'start' in req['action'] and req['status']!='ERROR':
1700 new_status = 'ACTIVE'
1701 elif 'resume' in req['action'] and req['status']!='ERROR' and req['status']!='INACTIVE':
1702 new_status = 'ACTIVE'
1703 elif 'pause' in req['action'] and req['status']!='ERROR':
1704 new_status = 'PAUSED'
1705 elif 'reboot' in req['action'] and req['status']!='ERROR':
1706 new_status = 'ACTIVE'
1707 elif 'rebuild' in req['action']:
1708 time.sleep(random.randint(20,150))
1709 new_status = 'ACTIVE'
1710 elif 'createImage' in req['action']:
1711 time.sleep(5)
1712 self.create_image(None, req)
1713 else:
1714 try:
1715 conn = host_thread.lvirt_module.open(self.lvirt_conn_uri)
1716 try:
1717 dom = conn.lookupByUUIDString(server_id)
1718 except host_thread.lvirt_module.libvirtError as e:
1719 text = e.get_error_message()
1720 if 'LookupByUUIDString' in text or 'Domain not found' in text or 'No existe un dominio coincidente' in text:
1721 dom = None
1722 else:
1723 self.logger.error("action_on_server id='%s' libvirt exception: %s", server_id, text)
1724 raise e
1725
1726 if 'forceOff' in req['action']:
1727 if dom == None:
1728 self.logger.debug("action_on_server id='%s' domain not running", server_id)
1729 else:
1730 try:
1731 self.logger.debug("sending DESTROY to server id='%s'", server_id)
1732 dom.destroy()
1733 except Exception as e:
1734 if "domain is not running" not in e.get_error_message():
1735 self.logger.error("action_on_server id='%s' Exception while sending force off: %s",
1736 server_id, e.get_error_message())
1737 last_error = 'action_on_server Exception while destroy: ' + e.get_error_message()
1738 new_status = 'ERROR'
1739
1740 elif 'terminate' in req['action']:
1741 if dom == None:
1742 self.logger.debug("action_on_server id='%s' domain not running", server_id)
1743 new_status = 'deleted'
1744 else:
1745 try:
1746 if req['action']['terminate'] == 'force':
1747 self.logger.debug("sending DESTROY to server id='%s'", server_id)
1748 dom.destroy()
1749 new_status = 'deleted'
1750 else:
1751 self.logger.debug("sending SHUTDOWN to server id='%s'", server_id)
1752 dom.shutdown()
1753 self.pending_terminate_server.append( (time.time()+10,server_id) )
1754 except Exception as e:
1755 self.logger.error("action_on_server id='%s' Exception while destroy: %s",
1756 server_id, e.get_error_message())
1757 last_error = 'action_on_server Exception while destroy: ' + e.get_error_message()
1758 new_status = 'ERROR'
1759 if "domain is not running" in e.get_error_message():
1760 try:
1761 dom.undefine()
1762 new_status = 'deleted'
1763 except Exception:
1764 self.logger.error("action_on_server id='%s' Exception while undefine: %s",
1765 server_id, e.get_error_message())
1766 last_error = 'action_on_server Exception2 while undefine:', e.get_error_message()
1767 #Exception: 'virDomainDetachDevice() failed'
1768 if new_status=='deleted':
1769 if server_id in self.server_status:
1770 del self.server_status[server_id]
1771 if req['uuid'] in self.localinfo['server_files']:
1772 for file_ in self.localinfo['server_files'][ req['uuid'] ].values():
1773 try:
1774 self.delete_file(file_['source file'])
1775 except Exception:
1776 pass
1777 del self.localinfo['server_files'][ req['uuid'] ]
1778 self.localinfo_dirty = True
1779
1780 elif 'shutoff' in req['action'] or 'shutdown' in req['action']:
1781 try:
1782 if dom == None:
1783 self.logger.debug("action_on_server id='%s' domain not running", server_id)
1784 else:
1785 dom.shutdown()
1786 # new_status = 'INACTIVE'
1787 #TODO: check status for changing at database
1788 except Exception as e:
1789 new_status = 'ERROR'
1790 self.logger.error("action_on_server id='%s' Exception while shutdown: %s",
1791 server_id, e.get_error_message())
1792 last_error = 'action_on_server Exception while shutdown: ' + e.get_error_message()
1793
1794 elif 'rebuild' in req['action']:
1795 if dom != None:
1796 dom.destroy()
1797 r = self.launch_server(conn, req, True, None)
1798 if r[0] <0:
1799 new_status = 'ERROR'
1800 last_error = r[1]
1801 else:
1802 new_status = 'ACTIVE'
1803 elif 'start' in req['action']:
1804 # The instance is only create in DB but not yet at libvirt domain, needs to be create
1805 rebuild = True if req['action']['start'] == 'rebuild' else False
1806 r = self.launch_server(conn, req, rebuild, dom)
1807 if r[0] <0:
1808 new_status = 'ERROR'
1809 last_error = r[1]
1810 else:
1811 new_status = 'ACTIVE'
1812
1813 elif 'resume' in req['action']:
1814 try:
1815 if dom == None:
1816 pass
1817 else:
1818 dom.resume()
1819 # new_status = 'ACTIVE'
1820 except Exception as e:
1821 self.logger.error("action_on_server id='%s' Exception while resume: %s",
1822 server_id, e.get_error_message())
1823
1824 elif 'pause' in req['action']:
1825 try:
1826 if dom == None:
1827 pass
1828 else:
1829 dom.suspend()
1830 # new_status = 'PAUSED'
1831 except Exception as e:
1832 self.logger.error("action_on_server id='%s' Exception while pause: %s",
1833 server_id, e.get_error_message())
1834
1835 elif 'reboot' in req['action']:
1836 try:
1837 if dom == None:
1838 pass
1839 else:
1840 dom.reboot()
1841 self.logger.debug("action_on_server id='%s' reboot:", server_id)
1842 #new_status = 'ACTIVE'
1843 except Exception as e:
1844 self.logger.error("action_on_server id='%s' Exception while reboot: %s",
1845 server_id, e.get_error_message())
1846 elif 'createImage' in req['action']:
1847 self.create_image(dom, req)
1848
1849
1850 conn.close()
1851 except host_thread.lvirt_module.libvirtError as e:
1852 if conn is not None: conn.close()
1853 text = e.get_error_message()
1854 new_status = "ERROR"
1855 last_error = text
1856 if 'LookupByUUIDString' in text or 'Domain not found' in text or 'No existe un dominio coincidente' in text:
1857 self.logger.debug("action_on_server id='%s' Exception removed from host", server_id)
1858 else:
1859 self.logger.error("action_on_server id='%s' Exception %s", server_id, text)
1860 #end of if self.test
1861 if new_status == None:
1862 return 1
1863
1864 self.logger.debug("action_on_server id='%s' new status=%s %s",server_id, new_status, last_error)
1865 UPDATE = {'progress':100, 'status':new_status}
1866
1867 if new_status=='ERROR':
1868 if not last_retry: #if there will be another retry do not update database
1869 return -1
1870 elif 'terminate' in req['action']:
1871 #PUT a log in the database
1872 self.logger.error("PANIC deleting server id='%s' %s", server_id, last_error)
1873 self.db_lock.acquire()
1874 self.db.new_row('logs',
1875 {'uuid':server_id, 'tenant_id':req['tenant_id'], 'related':'instances','level':'panic',
1876 'description':'PANIC deleting server from host '+self.name+': '+last_error}
1877 )
1878 self.db_lock.release()
1879 if server_id in self.server_status:
1880 del self.server_status[server_id]
1881 return -1
1882 else:
1883 UPDATE['last_error'] = last_error
1884 if new_status != 'deleted' and (new_status != old_status or new_status == 'ERROR') :
1885 self.db_lock.acquire()
1886 self.db.update_rows('instances', UPDATE, {'uuid':server_id}, log=True)
1887 self.server_status[server_id] = new_status
1888 self.db_lock.release()
1889 if new_status == 'ERROR':
1890 return -1
1891 return 1
1892
1893
1894 def restore_iface(self, name, mac, lib_conn=None):
1895 ''' make an ifdown, ifup to restore default parameter of na interface
1896 Params:
1897 mac: mac address of the interface
1898 lib_conn: connection to the libvirt, if None a new connection is created
1899 Return 0,None if ok, -1,text if fails
1900 '''
1901 conn=None
1902 ret = 0
1903 error_text=None
1904 if self.test:
1905 self.logger.debug("restore_iface '%s' %s", name, mac)
1906 return 0, None
1907 try:
1908 if not lib_conn:
1909 conn = host_thread.lvirt_module.open(self.lvirt_conn_uri)
1910 else:
1911 conn = lib_conn
1912
1913 #wait to the pending VM deletion
1914 #TODO.Revise self.server_forceoff(True)
1915
1916 iface = conn.interfaceLookupByMACString(mac)
1917 if iface.isActive():
1918 iface.destroy()
1919 iface.create()
1920 self.logger.debug("restore_iface '%s' %s", name, mac)
1921 except host_thread.lvirt_module.libvirtError as e:
1922 error_text = e.get_error_message()
1923 self.logger.error("restore_iface '%s' '%s' libvirt exception: %s", name, mac, error_text)
1924 ret=-1
1925 finally:
1926 if lib_conn is None and conn is not None:
1927 conn.close()
1928 return ret, error_text
1929
1930
1931 def create_image(self,dom, req):
1932 if self.test:
1933 if 'path' in req['action']['createImage']:
1934 file_dst = req['action']['createImage']['path']
1935 else:
1936 createImage=req['action']['createImage']
1937 img_name= createImage['source']['path']
1938 index=img_name.rfind('/')
1939 file_dst = self.get_notused_path(img_name[:index+1] + createImage['name'] + '.qcow2')
1940 image_status='ACTIVE'
1941 else:
1942 for retry in (0,1):
1943 try:
1944 server_id = req['uuid']
1945 createImage=req['action']['createImage']
1946 file_orig = self.localinfo['server_files'][server_id] [ createImage['source']['image_id'] ] ['source file']
1947 if 'path' in req['action']['createImage']:
1948 file_dst = req['action']['createImage']['path']
1949 else:
1950 img_name= createImage['source']['path']
1951 index=img_name.rfind('/')
1952 file_dst = self.get_notused_filename(img_name[:index+1] + createImage['name'] + '.qcow2')
1953
1954 self.copy_file(file_orig, file_dst)
1955 qemu_info = self.qemu_get_info(file_orig)
1956 if 'backing file' in qemu_info:
1957 for k,v in self.localinfo['files'].items():
1958 if v==qemu_info['backing file']:
1959 self.qemu_change_backing(file_dst, k)
1960 break
1961 image_status='ACTIVE'
1962 break
1963 except paramiko.ssh_exception.SSHException as e:
1964 image_status='ERROR'
1965 error_text = e.args[0]
1966 self.logger.error("create_image id='%s' ssh Exception: %s", server_id, error_text)
1967 if "SSH session not active" in error_text and retry==0:
1968 self.ssh_connect()
1969 except Exception as e:
1970 image_status='ERROR'
1971 error_text = str(e)
1972 self.logger.error("create_image id='%s' Exception: %s", server_id, error_text)
1973
1974 #TODO insert a last_error at database
1975 self.db_lock.acquire()
1976 self.db.update_rows('images', {'status':image_status, 'progress': 100, 'path':file_dst},
1977 {'uuid':req['new_image']['uuid']}, log=True)
1978 self.db_lock.release()
1979
1980 def edit_iface(self, port_id, old_net, new_net):
1981 #This action imply remove and insert interface to put proper parameters
1982 if self.test:
1983 time.sleep(1)
1984 else:
1985 #get iface details
1986 self.db_lock.acquire()
1987 r,c = self.db.get_table(FROM='ports as p join resources_port as rp on p.uuid=rp.port_id',
1988 WHERE={'port_id': port_id})
1989 self.db_lock.release()
1990 if r<0:
1991 self.logger.error("edit_iface %s DDBB error: %s", port_id, c)
1992 return
1993 elif r==0:
1994 self.logger.error("edit_iface %s port not found", port_id)
1995 return
1996 port=c[0]
1997 if port["model"]!="VF":
1998 self.logger.error("edit_iface %s ERROR model must be VF", port_id)
1999 return
2000 #create xml detach file
2001 xml=[]
2002 self.xml_level = 2
2003 xml.append("<interface type='hostdev' managed='yes'>")
2004 xml.append(" <mac address='" +port['mac']+ "'/>")
2005 xml.append(" <source>"+ self.pci2xml(port['pci'])+"\n </source>")
2006 xml.append('</interface>')
2007
2008
2009 try:
2010 conn=None
2011 conn = host_thread.lvirt_module.open(self.lvirt_conn_uri)
2012 dom = conn.lookupByUUIDString(port["instance_id"])
2013 if old_net:
2014 text="\n".join(xml)
2015 self.logger.debug("edit_iface detaching SRIOV interface " + text)
2016 dom.detachDeviceFlags(text, flags=host_thread.lvirt_module.VIR_DOMAIN_AFFECT_LIVE)
2017 if new_net:
2018 xml[-1] =" <vlan> <tag id='" + str(port['vlan']) + "'/> </vlan>"
2019 self.xml_level = 1
2020 xml.append(self.pci2xml(port.get('vpci',None)) )
2021 xml.append('</interface>')
2022 text="\n".join(xml)
2023 self.logger.debug("edit_iface attaching SRIOV interface " + text)
2024 dom.attachDeviceFlags(text, flags=host_thread.lvirt_module.VIR_DOMAIN_AFFECT_LIVE)
2025
2026 except host_thread.lvirt_module.libvirtError as e:
2027 text = e.get_error_message()
2028 self.logger.error("edit_iface %s libvirt exception: %s", port["instance_id"], text)
2029
2030 finally:
2031 if conn is not None: conn.close()
2032
2033
2034 def create_server(server, db, db_lock, only_of_ports):
2035 extended = server.get('extended', None)
2036 requirements={}
2037 requirements['numa']={'memory':0, 'proc_req_type': 'threads', 'proc_req_nb':0, 'port_list':[], 'sriov_list':[]}
2038 requirements['ram'] = server['flavor'].get('ram', 0)
2039 if requirements['ram']== None:
2040 requirements['ram'] = 0
2041 requirements['vcpus'] = server['flavor'].get('vcpus', 0)
2042 if requirements['vcpus']== None:
2043 requirements['vcpus'] = 0
2044 #If extended is not defined get requirements from flavor
2045 if extended is None:
2046 #If extended is defined in flavor convert to dictionary and use it
2047 if 'extended' in server['flavor'] and server['flavor']['extended'] != None:
2048 json_acceptable_string = server['flavor']['extended'].replace("'", "\"")
2049 extended = json.loads(json_acceptable_string)
2050 else:
2051 extended = None
2052 #print json.dumps(extended, indent=4)
2053
2054 #For simplicity only one numa VM are supported in the initial implementation
2055 if extended != None:
2056 numas = extended.get('numas', [])
2057 if len(numas)>1:
2058 return (-2, "Multi-NUMA VMs are not supported yet")
2059 #elif len(numas)<1:
2060 # return (-1, "At least one numa must be specified")
2061
2062 #a for loop is used in order to be ready to multi-NUMA VMs
2063 request = []
2064 for numa in numas:
2065 numa_req = {}
2066 numa_req['memory'] = numa.get('memory', 0)
2067 if 'cores' in numa:
2068 numa_req['proc_req_nb'] = numa['cores'] #number of cores or threads to be reserved
2069 numa_req['proc_req_type'] = 'cores' #indicates whether cores or threads must be reserved
2070 numa_req['proc_req_list'] = numa.get('cores-id', None) #list of ids to be assigned to the cores or threads
2071 elif 'paired-threads' in numa:
2072 numa_req['proc_req_nb'] = numa['paired-threads']
2073 numa_req['proc_req_type'] = 'paired-threads'
2074 numa_req['proc_req_list'] = numa.get('paired-threads-id', None)
2075 elif 'threads' in numa:
2076 numa_req['proc_req_nb'] = numa['threads']
2077 numa_req['proc_req_type'] = 'threads'
2078 numa_req['proc_req_list'] = numa.get('threads-id', None)
2079 else:
2080 numa_req['proc_req_nb'] = 0 # by default
2081 numa_req['proc_req_type'] = 'threads'
2082
2083
2084
2085 #Generate a list of sriov and another for physical interfaces
2086 interfaces = numa.get('interfaces', [])
2087 sriov_list = []
2088 port_list = []
2089 for iface in interfaces:
2090 iface['bandwidth'] = int(iface['bandwidth'])
2091 if iface['dedicated'][:3]=='yes':
2092 port_list.append(iface)
2093 else:
2094 sriov_list.append(iface)
2095
2096 #Save lists ordered from more restrictive to less bw requirements
2097 numa_req['sriov_list'] = sorted(sriov_list, key=lambda k: k['bandwidth'], reverse=True)
2098 numa_req['port_list'] = sorted(port_list, key=lambda k: k['bandwidth'], reverse=True)
2099
2100
2101 request.append(numa_req)
2102
2103 # print "----------\n"+json.dumps(request[0], indent=4)
2104 # print '----------\n\n'
2105
2106 #Search in db for an appropriate numa for each requested numa
2107 #at the moment multi-NUMA VMs are not supported
2108 if len(request)>0:
2109 requirements['numa'].update(request[0])
2110 if requirements['numa']['memory']>0:
2111 requirements['ram']=0 #By the moment I make incompatible ask for both Huge and non huge pages memory
2112 elif requirements['ram']==0:
2113 return (-1, "Memory information not set neither at extended field not at ram")
2114 if requirements['numa']['proc_req_nb']>0:
2115 requirements['vcpus']=0 #By the moment I make incompatible ask for both Isolated and non isolated cpus
2116 elif requirements['vcpus']==0:
2117 return (-1, "Processor information not set neither at extended field not at vcpus")
2118
2119
2120 db_lock.acquire()
2121 result, content = db.get_numas(requirements, server.get('host_id', None), only_of_ports)
2122 db_lock.release()
2123
2124 if result == -1:
2125 return (-1, content)
2126
2127 numa_id = content['numa_id']
2128 host_id = content['host_id']
2129
2130 #obtain threads_id and calculate pinning
2131 cpu_pinning = []
2132 reserved_threads=[]
2133 if requirements['numa']['proc_req_nb']>0:
2134 db_lock.acquire()
2135 result, content = db.get_table(FROM='resources_core',
2136 SELECT=('id','core_id','thread_id'),
2137 WHERE={'numa_id':numa_id,'instance_id': None, 'status':'ok'} )
2138 db_lock.release()
2139 if result <= 0:
2140 #print content
2141 return -1, content
2142
2143 #convert rows to a dictionary indexed by core_id
2144 cores_dict = {}
2145 for row in content:
2146 if not row['core_id'] in cores_dict:
2147 cores_dict[row['core_id']] = []
2148 cores_dict[row['core_id']].append([row['thread_id'],row['id']])
2149
2150 #In case full cores are requested
2151 paired = 'N'
2152 if requirements['numa']['proc_req_type'] == 'cores':
2153 #Get/create the list of the vcpu_ids
2154 vcpu_id_list = requirements['numa']['proc_req_list']
2155 if vcpu_id_list == None:
2156 vcpu_id_list = range(0,int(requirements['numa']['proc_req_nb']))
2157
2158 for threads in cores_dict.itervalues():
2159 #we need full cores
2160 if len(threads) != 2:
2161 continue
2162
2163 #set pinning for the first thread
2164 cpu_pinning.append( [ vcpu_id_list.pop(0), threads[0][0], threads[0][1] ] )
2165
2166 #reserve so it is not used the second thread
2167 reserved_threads.append(threads[1][1])
2168
2169 if len(vcpu_id_list) == 0:
2170 break
2171
2172 #In case paired threads are requested
2173 elif requirements['numa']['proc_req_type'] == 'paired-threads':
2174 paired = 'Y'
2175 #Get/create the list of the vcpu_ids
2176 if requirements['numa']['proc_req_list'] != None:
2177 vcpu_id_list = []
2178 for pair in requirements['numa']['proc_req_list']:
2179 if len(pair)!=2:
2180 return -1, "Field paired-threads-id not properly specified"
2181 return
2182 vcpu_id_list.append(pair[0])
2183 vcpu_id_list.append(pair[1])
2184 else:
2185 vcpu_id_list = range(0,2*int(requirements['numa']['proc_req_nb']))
2186
2187 for threads in cores_dict.itervalues():
2188 #we need full cores
2189 if len(threads) != 2:
2190 continue
2191 #set pinning for the first thread
2192 cpu_pinning.append([vcpu_id_list.pop(0), threads[0][0], threads[0][1]])
2193
2194 #set pinning for the second thread
2195 cpu_pinning.append([vcpu_id_list.pop(0), threads[1][0], threads[1][1]])
2196
2197 if len(vcpu_id_list) == 0:
2198 break
2199
2200 #In case normal threads are requested
2201 elif requirements['numa']['proc_req_type'] == 'threads':
2202 #Get/create the list of the vcpu_ids
2203 vcpu_id_list = requirements['numa']['proc_req_list']
2204 if vcpu_id_list == None:
2205 vcpu_id_list = range(0,int(requirements['numa']['proc_req_nb']))
2206
2207 for threads_index in sorted(cores_dict, key=lambda k: len(cores_dict[k])):
2208 threads = cores_dict[threads_index]
2209 #set pinning for the first thread
2210 cpu_pinning.append([vcpu_id_list.pop(0), threads[0][0], threads[0][1]])
2211
2212 #if exists, set pinning for the second thread
2213 if len(threads) == 2 and len(vcpu_id_list) != 0:
2214 cpu_pinning.append([vcpu_id_list.pop(0), threads[1][0], threads[1][1]])
2215
2216 if len(vcpu_id_list) == 0:
2217 break
2218
2219 #Get the source pci addresses for the selected numa
2220 used_sriov_ports = []
2221 for port in requirements['numa']['sriov_list']:
2222 db_lock.acquire()
2223 result, content = db.get_table(FROM='resources_port', SELECT=('id', 'pci', 'mac'),WHERE={'numa_id':numa_id,'root_id': port['port_id'], 'port_id': None, 'Mbps_used': 0} )
2224 db_lock.release()
2225 if result <= 0:
2226 #print content
2227 return -1, content
2228 for row in content:
2229 if row['id'] in used_sriov_ports or row['id']==port['port_id']:
2230 continue
2231 port['pci'] = row['pci']
2232 if 'mac_address' not in port:
2233 port['mac_address'] = row['mac']
2234 del port['mac']
2235 port['port_id']=row['id']
2236 port['Mbps_used'] = port['bandwidth']
2237 used_sriov_ports.append(row['id'])
2238 break
2239
2240 for port in requirements['numa']['port_list']:
2241 port['Mbps_used'] = None
2242 if port['dedicated'] != "yes:sriov":
2243 port['mac_address'] = port['mac']
2244 del port['mac']
2245 continue
2246 db_lock.acquire()
2247 result, content = db.get_table(FROM='resources_port', SELECT=('id', 'pci', 'mac', 'Mbps'),WHERE={'numa_id':numa_id,'root_id': port['port_id'], 'port_id': None, 'Mbps_used': 0} )
2248 db_lock.release()
2249 if result <= 0:
2250 #print content
2251 return -1, content
2252 port['Mbps_used'] = content[0]['Mbps']
2253 for row in content:
2254 if row['id'] in used_sriov_ports or row['id']==port['port_id']:
2255 continue
2256 port['pci'] = row['pci']
2257 if 'mac_address' not in port:
2258 port['mac_address'] = row['mac'] # mac cannot be set to passthrough ports
2259 del port['mac']
2260 port['port_id']=row['id']
2261 used_sriov_ports.append(row['id'])
2262 break
2263
2264 # print '2. Physical ports assignation:'+json.dumps(requirements['port_list'], indent=4)
2265 # print '2. SR-IOV assignation:'+json.dumps(requirements['sriov_list'], indent=4)
2266
2267 server['host_id'] = host_id
2268
2269 #Generate dictionary for saving in db the instance resources
2270 resources = {}
2271 resources['bridged-ifaces'] = []
2272
2273 numa_dict = {}
2274 numa_dict['interfaces'] = []
2275
2276 numa_dict['interfaces'] += requirements['numa']['port_list']
2277 numa_dict['interfaces'] += requirements['numa']['sriov_list']
2278
2279 #Check bridge information
2280 unified_dataplane_iface=[]
2281 unified_dataplane_iface += requirements['numa']['port_list']
2282 unified_dataplane_iface += requirements['numa']['sriov_list']
2283
2284 for control_iface in server.get('networks', []):
2285 control_iface['net_id']=control_iface.pop('uuid')
2286 #Get the brifge name
2287 db_lock.acquire()
2288 result, content = db.get_table(FROM='nets',
2289 SELECT=('name', 'type', 'vlan', 'provider', 'enable_dhcp',
2290 'dhcp_first_ip', 'dhcp_last_ip', 'cidr'),
2291 WHERE={'uuid': control_iface['net_id']})
2292 db_lock.release()
2293 if result < 0:
2294 pass
2295 elif result==0:
2296 return -1, "Error at field netwoks: Not found any network wit uuid %s" % control_iface['net_id']
2297 else:
2298 network=content[0]
2299 if control_iface.get("type", 'virtual') == 'virtual':
2300 if network['type']!='bridge_data' and network['type']!='bridge_man':
2301 return -1, "Error at field netwoks: network uuid %s for control interface is not of type bridge_man or bridge_data" % control_iface['net_id']
2302 resources['bridged-ifaces'].append(control_iface)
2303 if network.get("provider") and network["provider"][0:3] == "OVS":
2304 control_iface["type"] = "instance:ovs"
2305 else:
2306 control_iface["type"] = "instance:bridge"
2307 if network.get("vlan"):
2308 control_iface["vlan"] = network["vlan"]
2309
2310 if network.get("enable_dhcp") == 'true':
2311 control_iface["enable_dhcp"] = network.get("enable_dhcp")
2312 control_iface["dhcp_first_ip"] = network["dhcp_first_ip"]
2313 control_iface["dhcp_last_ip"] = network["dhcp_last_ip"]
2314 control_iface["cidr"] = network["cidr"]
2315 else:
2316 if network['type']!='data' and network['type']!='ptp':
2317 return -1, "Error at field netwoks: network uuid %s for dataplane interface is not of type data or ptp" % control_iface['net_id']
2318 #dataplane interface, look for it in the numa tree and asign this network
2319 iface_found=False
2320 for dataplane_iface in numa_dict['interfaces']:
2321 if dataplane_iface['name'] == control_iface.get("name"):
2322 if (dataplane_iface['dedicated'] == "yes" and control_iface["type"] != "PF") or \
2323 (dataplane_iface['dedicated'] == "no" and control_iface["type"] != "VF") or \
2324 (dataplane_iface['dedicated'] == "yes:sriov" and control_iface["type"] != "VFnotShared") :
2325 return -1, "Error at field netwoks: mismatch at interface '%s' from flavor 'dedicated=%s' and networks 'type=%s'" % \
2326 (control_iface.get("name"), dataplane_iface['dedicated'], control_iface["type"])
2327 dataplane_iface['uuid'] = control_iface['net_id']
2328 if dataplane_iface['dedicated'] == "no":
2329 dataplane_iface['vlan'] = network['vlan']
2330 if dataplane_iface['dedicated'] != "yes" and control_iface.get("mac_address"):
2331 dataplane_iface['mac_address'] = control_iface.get("mac_address")
2332 if control_iface.get("vpci"):
2333 dataplane_iface['vpci'] = control_iface.get("vpci")
2334 iface_found=True
2335 break
2336 if not iface_found:
2337 return -1, "Error at field netwoks: interface name %s from network not found at flavor" % control_iface.get("name")
2338
2339 resources['host_id'] = host_id
2340 resources['image_id'] = server['image_id']
2341 resources['flavor_id'] = server['flavor_id']
2342 resources['tenant_id'] = server['tenant_id']
2343 resources['ram'] = requirements['ram']
2344 resources['vcpus'] = requirements['vcpus']
2345 resources['status'] = 'CREATING'
2346
2347 if 'description' in server: resources['description'] = server['description']
2348 if 'name' in server: resources['name'] = server['name']
2349
2350 resources['extended'] = {} #optional
2351 resources['extended']['numas'] = []
2352 numa_dict['numa_id'] = numa_id
2353 numa_dict['memory'] = requirements['numa']['memory']
2354 numa_dict['cores'] = []
2355
2356 for core in cpu_pinning:
2357 numa_dict['cores'].append({'id': core[2], 'vthread': core[0], 'paired': paired})
2358 for core in reserved_threads:
2359 numa_dict['cores'].append({'id': core})
2360 resources['extended']['numas'].append(numa_dict)
2361 if extended!=None and 'devices' in extended: #TODO allow extra devices without numa
2362 resources['extended']['devices'] = extended['devices']
2363
2364
2365 # '===================================={'
2366 #print json.dumps(resources, indent=4)
2367 #print '====================================}'
2368
2369 return 0, resources
2370