2 # Copyright 2016 RIFT.IO Inc
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from datetime
import date
18 from Queue
import Queue
27 from util
.util
import get_url_target
31 self
._request
_count
= 0
32 self
._response
_count
= 0
34 self
._lock
= threading
.Lock()
37 def request_count(self
):
39 return self
._request
_count
42 def request_count(self
, value
):
44 self
._request
_count
= value
47 def response_count(self
):
49 return self
._response
_count
51 @response_count.setter
52 def response_count(self
, value
):
54 self
._response
_count
= value
56 class Worker(threading
.Thread
):
57 def __init__(self
, log
, connections
, stats
):
58 super(Worker
, self
).__init
__()
60 self
._connections
= connections
65 self
._lock
= threading
.Lock()
72 def running(self
, value
):
78 connection
= self
._connections
.get_nowait()
83 req
= connection
.recv(1024)
84 except socket
.error
as msg
:
85 self
._log
.error("error with connection read: " % msg
)
86 self
._connections
.put(connection
)
90 self
._connections
.put(connection
)
93 resp
= req
.decode('UTF-8')
94 self
._log
.debug("got: %s", resp
)
96 self
._stats
.request_count
+= 1
99 connection
.sendall(resp
)
100 self
._stats
.response_count
+= 1
101 except socket
.error
as msg
:
102 self
._log
.error("error with connection read: " % msg
)
103 self
._connections
.put(connection
)
106 self
._connections
.put(connection
)
109 def __init__(self
, worker_count
=5):
110 self
._log
= logging
.getLogger("pong")
111 self
._log
.setLevel(logging
.DEBUG
)
113 self
.listen_ip
= None
114 self
.listen_port
= None
116 self
._lock
= threading
.Lock()
118 self
._connections
= Queue()
120 self
._stats
= Stats()
122 self
._workers
= list()
124 self
._enabled
= False
126 for _
in range(worker_count
):
127 self
._workers
.append(Worker(self
._log
, self
._connections
, self
._stats
))
130 def listen_port(self
):
131 return self
._listen
_port
134 def listen_port(self
, value
):
135 self
._log
.debug("new listen port: %s" % value
)
136 self
._listen
_port
= value
140 return self
._listen
_ip
143 def listen_ip(self
, value
):
144 self
._log
.debug("listen pong ip: %s" % value
)
145 self
._listen
_ip
= value
154 def request_count(self
):
155 return self
._stats
.request_count
158 def response_count(self
):
159 return self
._stats
.response_count
162 self
._log
.debug("starting")
164 self
.listener_thread
= threading
.Thread(target
=self
._listen
)
165 self
.listener_thread
.start()
166 for worker
in self
._workers
:
171 self
._enabled
= False
173 self
._log
.debug("stopping workers")
174 for worker
in self
._workers
:
175 worker
.running
= False
177 self
._log
.debug("joining on workers")
178 for worker
in self
._workers
:
179 if worker
.is_alive():
182 while self
._connections
.full():
184 connection
= self
._connections
.get_nowait()
189 def close_socket(self
, msg
):
191 if self
._socket
!= None:
192 self
._socket
.shutdown(socket
.SHUT_RD
)
195 self
._log
.info("Closed socket with msg={}".format(msg
))
198 if self
._listen
_ip
is None or self
.listen_port
is None:
199 self
._log
.error("address not properly configured to listen")
202 self
._log
.info("listen for incomming connections")
204 self
._socket
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
205 self
._socket
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_REUSEADDR
, 1)
206 self
._socket
.setsockopt(socket
.IPPROTO_TCP
, socket
.TCP_NODELAY
, 1)
207 # self._socket.bind((self.listen_ip, self.listen_port))
208 self
._socket
.bind(("0.0.0.0", self
.listen_port
))
209 self
._socket
.settimeout(1)
214 self
._socket
.listen(1)
215 connection
, address
= self
._socket
.accept()
216 except socket
.timeout
:
218 self
._log
.info("Accepted connection from {}".format(address
))
220 self
._connections
.put(connection
)
223 except socket
.error
as msg
:
224 self
.close_socket(msg
)
226 class PongStatsHandler(tornado
.web
.RequestHandler
):
227 def initialize(self
, pong_instance
):
228 self
._pong
_instance
= pong_instance
231 response
= {'ping-request-rx-count': self
._pong
_instance
.request_count
,
232 'ping-response-tx-count': self
._pong
_instance
.response_count
}
237 class PongServerHandler(tornado
.web
.RequestHandler
):
238 def initialize(self
, pong_instance
):
239 self
._pong
_instance
= pong_instance
242 response
= {'ip': self
._pong
_instance
.listen_ip
,
243 'port': self
._pong
_instance
.listen_port
}
247 def post(self
, args
):
248 target
= get_url_target(self
.request
.uri
)
249 body
= self
.request
.body
.decode("utf-8")
250 body_header
= self
.request
.headers
.get("Content-Type")
252 if "json" not in body_header
:
253 self
.write("Content-Type must be some kind of json")
258 json_dicts
= json
.loads(body
)
260 self
.write("Content-Type must be some kind of json")
264 if target
== "server":
266 if type(json_dicts
['port']) is not int:
270 if type(json_dicts
['ip']) not in (str, unicode):
274 self
._pong
_instance
.listen_ip
= json_dicts
['ip']
275 self
._pong
_instance
.listen_port
= json_dicts
['port']
283 class PongAdminStatusHandler(tornado
.web
.RequestHandler
):
284 def initialize(self
, pong_instance
):
285 self
._pong
_instance
= pong_instance
288 target
= get_url_target(self
.request
.uri
)
290 if target
== "state":
291 value
= "enabled" if self
._pong
_instance
.enabled
else "disabled"
293 response
= { 'adminstatus': value
}
300 def post(self
, args
):
301 target
= get_url_target(self
.request
.uri
)
302 body
= self
.request
.body
.decode("utf-8")
303 body_header
= self
.request
.headers
.get("Content-Type")
305 if "json" not in body_header
:
306 self
.write("Content-Type must be some kind of json")
311 json_dicts
= json
.loads(body
)
313 self
.write("Content-Type must be some kind of json")
317 if target
== "state":
318 if type(json_dicts
['enable']) is not bool:
322 if json_dicts
['enable']:
323 if not self
._pong
_instance
.enabled
:
324 self
._pong
_instance
.start()
326 self
._pong
_instance
.stop()