openmano first code upload
[osm/RO.git] / console_proxy_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U.
5 # This file is part of openmano
6 # All Rights Reserved.
7 #
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
11 #
12 # http://www.apache.org/licenses/LICENSE-2.0
13 #
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
18 # under the License.
19 #
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact with: nfvlabs@tid.es
22 ##
23
24 '''
25 Implement like a proxy for TCP/IP in a separated thread.
26 It creates two sockets to bypass the TCP/IP packets among the fix console
27 server specified at class construction (console_host, console_port)
28 and a client that connect against the (host, port) specified also at construction
29
30 --------------------- -------------------------------
31 | OPENMANO | | VIM |
32 client 1 ----> | ConsoleProxyThread | ------> | Console server |
33 client 2 ----> | (host, port) | ------> |(console_host, console_server)|
34 ... -------------------- ------------------------------
35 '''
36 __author__="Alfonso Tierno"
37 __date__ ="$19-nov-2015 09:07:15$"
38
39 import socket
40 import select
41 import threading
42
43
44 class ConsoleProxyException(Exception):
45 '''raise when an exception has found'''
46 class ConsoleProxyExceptionPortUsed(ConsoleProxyException):
47 '''raise when the port is used'''
48
49 class ConsoleProxyThread(threading.Thread):
50 buffer_size = 4096
51 check_finish = 1 #frequency to check if requested to end in seconds
52
53 def __init__(self, host, port, console_host, console_port):
54 try:
55 threading.Thread.__init__(self)
56 self.console_host = console_host
57 self.console_port = console_port
58 self.host = host
59 self.port = port
60 self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
61 self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
62 self.server.bind((host, port))
63 self.server.listen(200)
64 #TODO timeout in a lock section can be used to autoterminate the thread
65 #when inactivity and timeout<time : set timeout=0 and terminate
66 #from outside, close class when timeout==0; set timeout=time+120 when adding a new console on this thread
67 #set self.timeout = time.time() + 120 at init
68 self.name = "ConsoleProxy " + console_host + ":" + str(console_port)
69 self.input_list = [self.server]
70 self.channel = {}
71 self.terminate = False #put at True from outside to force termination
72 except (socket.error, socket.herror, socket.gaierror, socket.timeout) as e:
73 if e is socket.error and e.errno==98:
74 raise ConsoleProxyExceptionPortUsed("socket.error " + str(e))
75 raise ConsoleProxyException(type(e).__name__ + ": "+ (str(e) if len(e.args)==0 else str(e.args[0])) )
76
77 def run(self):
78 while 1:
79 try:
80 inputready, _, _ = select.select(self.input_list, [], [], self.check_finish)
81 except select.error as e:
82 print self.name, ": Exception on select %s: %s" % (type(e).__name__, str(e) )
83 self.on_terminate()
84
85 if self.terminate:
86 self.on_terminate()
87 print self.name, ": Terminate because commanded"
88 break
89
90 for sock in inputready:
91 if sock == self.server:
92 self.on_accept()
93 else:
94 self.on_recv(sock)
95
96 def on_terminate(self):
97 while self.input_list:
98 if self.input_list[0] is self.server:
99 self.server.close()
100 del self.input_list[0]
101 else:
102 self.on_close(self.input_list[0], "Terminating thread")
103
104 def on_accept(self):
105 #accept
106 try:
107 clientsock, clientaddr = self.server.accept()
108 except (socket.error, socket.herror, socket.gaierror, socket.timeout) as e:
109 print self.name, ": Exception on_accept %s: %s" % (type(e).__name__, str(e) )
110 return False
111 #print self.name, ": Accept new client ", clientaddr
112
113 #connect
114 try:
115 forward = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
116 forward.connect((self.console_host, self.console_port))
117 name = "%s:%d => (%s:%d => %s:%d) => %s:%d" %\
118 (clientsock.getpeername() + clientsock.getsockname() + forward.getsockname() + forward.getpeername() )
119 print self.name, ": new connection " + name
120
121 self.input_list.append(clientsock)
122 self.input_list.append(forward)
123 info = { "name": name,
124 "clientsock" : clientsock,
125 "serversock" : forward
126 }
127 self.channel[clientsock] = info
128 self.channel[forward] = info
129 return True
130 except (socket.error, socket.herror, socket.gaierror, socket.timeout) as e:
131 print self.name, ": Exception on_connect to server %s:%d; %s: %s" % (self.console_host, self.console_port, type(e).__name__, str(e) )
132 print self.name, ": Close client side ", clientaddr
133 clientsock.close()
134 return False
135
136 def on_close(self, sock, cause):
137 if sock not in self.channel:
138 return #can happen if there is data ready to received at both sides and the channel has been deleted. QUITE IMPROBABLE but just in case
139 info = self.channel[sock]
140 #debug info
141 sockname = "client" if sock is info["clientsock"] else "server"
142 print self.name, ": del connection %s %s at %s side" % (info["name"], cause, sockname)
143 #close sockets
144 try:
145 # close the connection with client
146 info["clientsock"].close() # equivalent to do self.s.close()
147 except (socket.error, socket.herror, socket.gaierror, socket.timeout) as e:
148 print self.name, ": Exception on_close client socket %s: %s" % (type(e).__name__, str(e) )
149 try:
150 # close the connection with remote server
151 info["serversock"].close()
152 except (socket.error, socket.herror, socket.gaierror, socket.timeout) as e:
153 print self.name, ": Exception on_close server socket %s: %s" % (type(e).__name__, str(e) )
154
155 #remove objects from input_list
156 self.input_list.remove(info["clientsock"])
157 self.input_list.remove(info["serversock"])
158 # delete both objects from channel dict
159 del self.channel[info["clientsock"]]
160 del self.channel[info["serversock"]]
161
162 def on_recv(self, sock):
163 if sock not in self.channel:
164 return #can happen if there is data ready to received at both sides and the channel has been deleted. QUITE IMPROBABLE but just in case
165 info = self.channel[sock]
166 peersock = info["serversock"] if sock is info["clientsock"] else info["clientsock"]
167 try:
168 data = sock.recv(self.buffer_size)
169 if len(data) == 0:
170 self.on_close(sock, "peer closed")
171 else:
172 #print self.data
173 sock = peersock
174 peersock.send(data)
175 except (socket.error, socket.herror, socket.gaierror, socket.timeout) as e:
176 #print self.name, ": Exception %s: %s" % (type(e).__name__, str(e) )
177 self.on_close(sock, "Exception %s: %s" % (type(e).__name__, str(e) ))
178
179
180
181 #def start_timeout(self):
182 # self.timeout = time.time() + 120
183