1 # -*- coding: utf-8 -*-
4 # Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
5 # This file is part of openvim
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
12 # http://www.apache.org/licenses/LICENSE-2.0
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact with: nfvlabs@tid.es
25 This is thread that interact with the dhcp server to get the IP addresses
27 __author__
="Pablo Montes, Alfonso Tierno"
28 __date__
="$4-Jan-2016 12:07:15$"
40 #TODO: insert a logging system
42 class dhcp_thread(threading
.Thread
):
43 def __init__(self
, dhcp_params
, db
, db_lock
, test
, dhcp_nets
, logger_name
=None, debug
=None):
45 Arguments: thread_info must be a dictionary with:
46 'dhcp_params' dhcp server parameters with the following keys:
47 mandatory : user, host, port, key, ifaces(interface name list of the one managed by the dhcp)
48 optional: password, key, port(22)
49 'db' 'db_lock': database class and lock for accessing it
50 'test': in test mode no acces to a server is done, and ip is invented
52 threading
.Thread
.__init
__(self
)
53 self
.dhcp_params
= dhcp_params
55 self
.db_lock
= db_lock
57 self
.dhcp_nets
= dhcp_nets
60 self
.logger_name
= logger_name
62 self
.logger_name
= "openvim.dhcp"
63 self
.logger
= logging
.getLogger(self
.logger_name
)
65 self
.logger
.setLevel(getattr(logging
, debug
))
67 self
.mac_status
={} #dictionary of mac_address to retrieve information
70 #next_reading: time for the next trying to check ACTIVE status or IP
71 #created: time when it was added
72 #active: time when the VM becomes into ACTIVE status
75 self
.queueLock
= threading
.Lock()
76 self
.taskQueue
= Queue
.Queue(2000)
78 def ssh_connect(self
):
81 self
.ssh_conn
= paramiko
.SSHClient()
82 self
.ssh_conn
.set_missing_host_key_policy(paramiko
.AutoAddPolicy())
83 self
.ssh_conn
.load_system_host_keys()
84 self
.ssh_conn
.connect(self
.dhcp_params
["host"], port
=self
.dhcp_params
.get("port", 22),
85 username
=self
.dhcp_params
["user"], password
=self
.dhcp_params
.get("password"),
86 key_filename
=self
.dhcp_params
.get("keyfile"), timeout
=2)
87 except paramiko
.ssh_exception
.SSHException
as e
:
88 self
.logger
.error("ssh_connect ssh Exception " + str(e
))
90 def load_mac_from_db(self
):
91 #TODO get macs to follow from the database
92 self
.logger
.debug("load macs from db")
93 self
.db_lock
.acquire()
94 r
,c
= self
.db
.get_table(SELECT
=('mac','ip_address','nets.uuid as net_id', ),
95 FROM
='ports join nets on ports.net_id=nets.uuid',
96 WHERE_NOT
={'ports.instance_id': None, 'nets.provider': None})
97 self
.db_lock
.release()
101 self
.logger
.error("Error getting data from database: " + c
)
104 if port
["net_id"] in self
.dhcp_nets
:
105 self
.mac_status
[ port
["mac"] ] = {"ip": port
["ip_address"], "next_reading": now
, "created": now
, "retries":0}
107 def insert_task(self
, task
, *aditional
):
109 self
.queueLock
.acquire()
110 task
= self
.taskQueue
.put( (task
,) + aditional
, timeout
=5)
111 self
.queueLock
.release()
114 return -1, "timeout inserting a task over dhcp_thread"
117 self
.logger
.debug("starting, nets: " + str(self
.dhcp_nets
))
118 next_iteration
= time
.time() + 10
120 self
.load_mac_from_db()
123 self
.queueLock
.acquire()
124 if not self
.taskQueue
.empty():
125 task
= self
.taskQueue
.get()
128 self
.queueLock
.release()
132 if now
>= next_iteration
:
133 next_iteration
= self
.get_ip_from_dhcp()
139 self
.logger
.debug("processing task add mac " + str(task
[1]))
141 self
.mac_status
[task
[1] ] = {"ip": None, "next_reading": now
, "created": now
, "retries":0}
143 elif task
[0] == 'del':
144 self
.logger
.debug("processing task del mac " + str(task
[1]))
145 if task
[1] in self
.mac_status
:
146 del self
.mac_status
[task
[1] ]
147 elif task
[0] == 'exit':
148 self
.logger
.debug("processing task exit")
152 self
.logger
.error("unknown task: " + str(task
))
153 except Exception as e
:
154 self
.logger
.critical("Unexpected exception at run: " + str(e
), exc_info
=True)
159 self
.ssh_conn
.close()
160 except Exception as e
:
161 self
.logger
.error("terminate Exception: " + str(e
))
162 self
.logger
.debug("exit from dhcp_thread")
164 def get_ip_from_dhcp(self
):
167 next_iteration
= now
+ 40000 # >10 hores
169 #print self.name, "Iteration"
170 for mac_address
in self
.mac_status
:
171 if now
< self
.mac_status
[mac_address
]["next_reading"]:
172 if self
.mac_status
[mac_address
]["next_reading"] < next_iteration
:
173 next_iteration
= self
.mac_status
[mac_address
]["next_reading"]
176 if self
.mac_status
[mac_address
].get("active") == None:
177 #check from db if already active
178 self
.db_lock
.acquire()
179 r
,c
= self
.db
.get_table(FROM
="ports as p join instances as i on p.instance_id=i.uuid",
180 WHERE
={"p.mac": mac_address
, "i.status": "ACTIVE"})
181 self
.db_lock
.release()
183 self
.mac_status
[mac_address
]["active"] = now
184 self
.mac_status
[mac_address
]["next_reading"] = (int(now
)/2 +1)* 2
185 self
.logger
.debug("mac %s VM ACTIVE", mac_address
)
186 self
.mac_status
[mac_address
]["retries"] = 0
188 #print self.name, "mac %s VM INACTIVE" % (mac_address)
189 if now
- self
.mac_status
[mac_address
]["created"] > 300:
190 #modify Database to tell openmano that we can not get dhcp from the machine
191 if not self
.mac_status
[mac_address
].get("ip"):
192 self
.db_lock
.acquire()
193 r
,c
= self
.db
.update_rows("ports", {"ip_address": "0.0.0.0"}, {"mac": mac_address
})
194 self
.db_lock
.release()
195 self
.mac_status
[mac_address
]["ip"] = "0.0.0.0"
196 self
.logger
.debug("mac %s >> set to 0.0.0.0 because of timeout", mac_address
)
197 self
.mac_status
[mac_address
]["next_reading"] = (int(now
)/60 +1)* 60
199 self
.mac_status
[mac_address
]["next_reading"] = (int(now
)/6 +1)* 6
200 if self
.mac_status
[mac_address
]["next_reading"] < next_iteration
:
201 next_iteration
= self
.mac_status
[mac_address
]["next_reading"]
206 if self
.mac_status
[mac_address
]["retries"]>random
.randint(10,100): #wait between 10 and 100 seconds to produce a fake IP
207 content
= self
.get_fake_ip()
210 elif self
.dhcp_params
["host"]=="localhost":
212 command
= ['get_dhcp_lease.sh', mac_address
]
213 content
= subprocess
.check_output(command
)
214 except Exception as e
:
215 self
.logger
.error("get_ip_from_dhcp subprocess Exception " + str(e
))
219 if not self
.ssh_conn
:
221 command
= 'get_dhcp_lease.sh ' + mac_address
222 (_
, stdout
, _
) = self
.ssh_conn
.exec_command(command
)
223 content
= stdout
.read()
224 except paramiko
.ssh_exception
.SSHException
as e
:
225 self
.logger
.error("get_ip_from_dhcp: ssh_Exception: " + str(e
))
228 except Exception as e
:
229 self
.logger
.error("get_ip_from_dhcp: Exception: " + str(e
))
234 self
.mac_status
[mac_address
]["ip"] = content
236 self
.db_lock
.acquire()
237 r
,c
= self
.db
.update_rows("ports", {"ip_address": content
}, {"mac": mac_address
})
238 self
.db_lock
.release()
240 self
.logger
.error("Database update error: " + c
)
242 self
.mac_status
[mac_address
]["retries"] = 0
243 self
.mac_status
[mac_address
]["next_reading"] = (int(now
)/3600 +1)* 36000 # 10 hores
244 if self
.mac_status
[mac_address
]["next_reading"] < next_iteration
:
245 next_iteration
= self
.mac_status
[mac_address
]["next_reading"]
246 self
.logger
.debug("mac %s >> %s", mac_address
, content
)
249 self
.mac_status
[mac_address
]["retries"] +=1
250 #next iteration is every 2sec at the beginning; every 5sec after a minute, every 1min after a 5min
251 if now
- self
.mac_status
[mac_address
]["active"] > 120:
252 #modify Database to tell openmano that we can not get dhcp from the machine
253 if not self
.mac_status
[mac_address
].get("ip"):
254 self
.db_lock
.acquire()
255 r
,c
= self
.db
.update_rows("ports", {"ip_address": "0.0.0.0"}, {"mac": mac_address
})
256 self
.db_lock
.release()
257 self
.mac_status
[mac_address
]["ip"] = "0.0.0.0"
258 self
.logger
.debug("mac %s >> set to 0.0.0.0 because of timeout", mac_address
)
260 if now
- self
.mac_status
[mac_address
]["active"] > 60:
261 self
.mac_status
[mac_address
]["next_reading"] = (int(now
)/6 +1)* 6
262 elif now
- self
.mac_status
[mac_address
]["active"] > 300:
263 self
.mac_status
[mac_address
]["next_reading"] = (int(now
)/60 +1)* 60
265 self
.mac_status
[mac_address
]["next_reading"] = (int(now
)/2 +1)* 2
267 if self
.mac_status
[mac_address
]["next_reading"] < next_iteration
:
268 next_iteration
= self
.mac_status
[mac_address
]["next_reading"]
269 return next_iteration
271 def get_fake_ip(self
):
272 fake_ip
= "192.168.{}.{}".format(random
.randint(1,254), random
.randint(1,254) )
274 #check not already provided
276 for mac_address
in self
.mac_status
:
277 if self
.mac_status
[mac_address
]["ip"] == fake_ip
:
284 #EXAMPLE of bash script that must be available at the DHCP server for "isc-dhcp-server" type
285 # $ cat ./get_dhcp_lease.sh
288 # ($1=="lease" && $3=="{"){ lease=$2; active="no"; found="no" }
289 # ($1=="binding" && $2=="state" && $3=="active;"){ active="yes" }
290 # ($1=="hardware" && $2=="ethernet" && $3==tolower("'$1';")){ found="yes" }
291 # ($1=="client-hostname"){ name=$2 }
292 # ($1=="}"){ if (active=="yes" && found=="yes"){ target_lease=lease; target_name=name}}
293 # END{printf("%s", target_lease)} #print target_name
294 # ' /var/lib/dhcp/dhcpd.leases