2 # Copyright 2016 RIFT.IO Inc
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from datetime
import date
26 from util
.util
import get_url_target
30 self
._log
= logging
.getLogger("ping")
31 self
._log
.setLevel(logging
.DEBUG
)
34 self
._request
_count
= 0;
35 self
._response
_count
= 0;
38 self
._pong
_port
= None
40 self
._send
_rate
= 1 # per second
42 self
._close
_lock
= threading
.Lock()
49 return self
._send
_rate
52 def rate(self
, value
):
53 self
._log
.debug("new rate: %s" % value
)
54 self
._send
_rate
= value
58 return self
._pong
_port
61 def pong_port(self
, value
):
62 self
._log
.debug("new pong port: %s" % value
)
63 self
._pong
_port
= value
70 def pong_ip(self
, value
):
72 self
._log
.debug("new pong ip: %s" % value
)
80 def request_count(self
):
81 return self
._request
_count
84 def response_count(self
):
85 return self
._response
_count
88 self
._log
.debug("starting")
91 self
.send_thread
= threading
.Thread(target
=self
.send_ping
)
92 self
.recv_thread
= threading
.Thread(target
=self
.recv_resp
)
93 self
.send_thread
.start()
94 self
.recv_thread
.start()
97 self
._log
.debug("stopping")
99 self
.close_socket("stopping")
101 def close_socket(self
, msg
):
102 self
._close
_lock
.acquire()
103 if self
._socket
!= None:
106 self
._log
.info("Closed socket with msg={}".format(msg
))
107 self
._close
_lock
.release()
109 def open_socket(self
):
111 self
._log
.debug("construct socket")
112 self
._socket
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
113 self
._socket
.settimeout(1)
114 except socket
.error
as msg
:
115 self
._log
.error("error constructing socket %s" % msg
)
120 self
._log
.info("Trying to connect....")
121 self
._socket
.connect((self
.pong_ip
, self
.pong_port
))
122 self
._socket
.setsockopt(socket
.IPPROTO_TCP
, socket
.TCP_NODELAY
, 1)
123 self
._log
.info("Socket connected")
125 except socket
.error
as msg
:
133 if self
._socket
!= None:
134 req
= "rwpingpong-{}".format(self
._ping
_count
)
136 self
._log
.info("sending: %s" %req
)
137 self
._socket
.sendall(req
)
138 self
._ping
_count
+= 1
139 self
._request
_count
+= 1
140 except socket
.error
as msg
:
141 self
._log
.error("Error({}) sending data".format(msg
))
142 self
.close_socket(msg
)
145 time
.sleep(1.0/self
._send
_rate
)
147 self
._log
.info("Stopping send_ping")
152 if self
._socket
!= None:
154 respb
= self
._socket
.recv(1024)
155 except socket
.timeout
:
157 except socket
.error
as msg
:
158 self
._log
.error("Error({}) receiving data".format(msg
))
161 # self.close_socket(msg)
167 resp
= respb
.decode('UTF-8')
168 self
._response
_count
+= 1
169 self
._log
.info("receive: %s" % resp
)
171 self
._log
.info("Stopping recv_resp")
173 class PingServerHandler(tornado
.web
.RequestHandler
):
174 def initialize(self
, ping_instance
):
175 self
._ping
_instance
= ping_instance
178 response
= {'ip': self
._ping
_instance
.pong_ip
,
179 'port': self
._ping
_instance
.pong_port
}
183 def post(self
, args
):
184 target
= get_url_target(self
.request
.uri
)
185 body
= self
.request
.body
.decode("utf-8")
186 body_header
= self
.request
.headers
.get("Content-Type")
188 if "json" not in body_header
:
189 self
.write("Content-Type must be some kind of json 2")
194 json_dicts
= json
.loads(body
)
196 self
.write("Content-Type must be some kind of json 1")
200 if target
== "server":
201 if type(json_dicts
['port']) is not int:
205 if type(json_dicts
['ip']) not in (str, unicode):
209 self
._ping
_instance
.pong_ip
= json_dicts
['ip']
210 self
._ping
_instance
.pong_port
= json_dicts
['port']
218 class PingAdminStatusHandler(tornado
.web
.RequestHandler
):
219 def initialize(self
, ping_instance
):
220 self
._ping
_instance
= ping_instance
223 target
= get_url_target(self
.request
.uri
)
224 if target
== "state":
225 value
= "enabled" if self
._ping
_instance
.enabled
else "disabled"
227 response
= { 'adminstatus': value
}
234 def post(self
, args
):
235 target
= get_url_target(self
.request
.uri
)
236 body
= self
.request
.body
.decode("utf-8")
237 body_header
= self
.request
.headers
.get("Content-Type")
239 if "json" not in body_header
:
240 self
.write("Content-Type must be some kind of json 2")
245 json_dicts
= json
.loads(body
)
247 self
.write("Content-Type must be some kind of json 1")
251 if target
== "state":
252 if type(json_dicts
['enable']) is not bool:
256 if json_dicts
['enable']:
257 if not self
._ping
_instance
.enabled
:
258 self
._ping
_instance
.start()
260 self
._ping
_instance
.stop()
268 class PingStatsHandler(tornado
.web
.RequestHandler
):
269 def initialize(self
, ping_instance
):
270 self
._ping
_instance
= ping_instance
273 response
= {'ping-request-tx-count': self
._ping
_instance
.request_count
,
274 'ping-response-rx-count': self
._ping
_instance
.response_count
}
278 class PingRateHandler(tornado
.web
.RequestHandler
):
279 def initialize(self
, ping_instance
):
280 self
._ping
_instance
= ping_instance
283 response
= { 'rate': self
._ping
_instance
.rate
}
287 def post(self
, args
):
288 target
= get_url_target(self
.request
.uri
)
289 body
= self
.request
.body
.decode("utf-8")
290 body_header
= self
.request
.headers
.get("Content-Type")
292 if "json" not in body_header
:
297 json_dicts
= json
.loads(body
)
303 if type(json_dicts
['rate']) is not int:
307 self
._ping
_instance
.rate
= json_dicts
['rate']