inject_user_key routine fixes
[osm/RO.git] / osm_ro / console_proxy_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2015 Telefonica Investigacion y Desarrollo, S.A.U.
5 # This file is part of openmano
6 # All Rights Reserved.
7 #
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
11 #
12 # http://www.apache.org/licenses/LICENSE-2.0
13 #
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
18 # under the License.
19 #
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact with: nfvlabs@tid.es
22 ##
23
24 '''
25 Implement like a proxy for TCP/IP in a separated thread.
26 It creates two sockets to bypass the TCP/IP packets among the fix console
27 server specified at class construction (console_host, console_port)
28 and a client that connect against the (host, port) specified also at construction
29
30 --------------------- -------------------------------
31 | OPENMANO | | VIM |
32 client 1 ----> | ConsoleProxyThread | ------> | Console server |
33 client 2 ----> | (host, port) | ------> |(console_host, console_server)|
34 ... -------------------- ------------------------------
35 '''
36 __author__="Alfonso Tierno"
37 __date__ ="$19-nov-2015 09:07:15$"
38
39 import socket
40 import select
41 import threading
42 import logging
43
44
45 class ConsoleProxyException(Exception):
46 '''raise when an exception has found'''
47 class ConsoleProxyExceptionPortUsed(ConsoleProxyException):
48 '''raise when the port is used'''
49
50 class ConsoleProxyThread(threading.Thread):
51 buffer_size = 4096
52 check_finish = 1 #frequency to check if requested to end in seconds
53
54 def __init__(self, host, port, console_host, console_port, log_level=None):
55 try:
56 threading.Thread.__init__(self)
57 self.console_host = console_host
58 self.console_port = console_port
59 self.host = host
60 self.port = port
61 self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
62 self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
63 self.server.bind((host, port))
64 self.server.listen(200)
65 #TODO timeout in a lock section can be used to autoterminate the thread
66 #when inactivity and timeout<time : set timeout=0 and terminate
67 #from outside, close class when timeout==0; set timeout=time+120 when adding a new console on this thread
68 #set self.timeout = time.time() + 120 at init
69 self.name = "ConsoleProxy " + console_host + ":" + str(console_port)
70 self.input_list = [self.server]
71 self.channel = {}
72 self.terminate = False #put at True from outside to force termination
73 self.logger = logging.getLogger('openmano.console')
74 if log_level:
75 self.logger.setLevel( getattr(logging, log_level) )
76
77 except (socket.error, socket.herror, socket.gaierror, socket.timeout) as e:
78 if e is socket.error and e.errno==98:
79 raise ConsoleProxyExceptionPortUsed("socket.error " + str(e))
80 raise ConsoleProxyException(type(e).__name__ + ": "+ (str(e) if len(e.args)==0 else str(e.args[0])) )
81
82 def run(self):
83 while 1:
84 try:
85 inputready, _, _ = select.select(self.input_list, [], [], self.check_finish)
86 except select.error as e:
87 self.logger.error("Exception on select %s: %s", type(e).__name__, str(e) )
88 self.on_terminate()
89
90 if self.terminate:
91 self.on_terminate()
92 self.logger.debug("Terminate because commanded")
93 break
94
95 for sock in inputready:
96 if sock == self.server:
97 self.on_accept()
98 else:
99 self.on_recv(sock)
100
101 def on_terminate(self):
102 while self.input_list:
103 if self.input_list[0] is self.server:
104 self.server.close()
105 del self.input_list[0]
106 else:
107 self.on_close(self.input_list[0], "Terminating thread")
108
109 def on_accept(self):
110 #accept
111 try:
112 clientsock, clientaddr = self.server.accept()
113 except (socket.error, socket.herror, socket.gaierror, socket.timeout) as e:
114 self.logger.error("Exception on_accept %s: %s", type(e).__name__, str(e) )
115 return False
116 #print self.name, ": Accept new client ", clientaddr
117
118 #connect
119 try:
120 forward = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
121 forward.connect((self.console_host, self.console_port))
122 name = "%s:%d => (%s:%d => %s:%d) => %s:%d" %\
123 (clientsock.getpeername() + clientsock.getsockname() + forward.getsockname() + forward.getpeername() )
124 self.logger.warn("new connection " + name)
125
126 self.input_list.append(clientsock)
127 self.input_list.append(forward)
128 info = { "name": name,
129 "clientsock" : clientsock,
130 "serversock" : forward
131 }
132 self.channel[clientsock] = info
133 self.channel[forward] = info
134 return True
135 except (socket.error, socket.herror, socket.gaierror, socket.timeout) as e:
136 self.logger.error("Exception on_connect to server %s:%d; %s: %s Close client side %s",
137 self.console_host, self.console_port, type(e).__name__, str(e), str(clientaddr) )
138 clientsock.close()
139 return False
140
141 def on_close(self, sock, cause):
142 if sock not in self.channel:
143 return #can happen if there is data ready to received at both sides and the channel has been deleted. QUITE IMPROBABLE but just in case
144 info = self.channel[sock]
145 #debug info
146 sockname = "client" if sock is info["clientsock"] else "server"
147 self.logger.warn("del connection %s %s at %s side", info["name"], str(cause), str(sockname) )
148 #close sockets
149 try:
150 # close the connection with client
151 info["clientsock"].close() # equivalent to do self.s.close()
152 except (socket.error, socket.herror, socket.gaierror, socket.timeout) as e:
153 self.logger.error("Exception on_close client socket %s: %s", type(e).__name__, str(e) )
154 try:
155 # close the connection with remote server
156 info["serversock"].close()
157 except (socket.error, socket.herror, socket.gaierror, socket.timeout) as e:
158 self.logger.error("Exception on_close server socket %s: %s", type(e).__name__, str(e) )
159
160 #remove objects from input_list
161 self.input_list.remove(info["clientsock"])
162 self.input_list.remove(info["serversock"])
163 # delete both objects from channel dict
164 del self.channel[info["clientsock"]]
165 del self.channel[info["serversock"]]
166
167 def on_recv(self, sock):
168 if sock not in self.channel:
169 return #can happen if there is data ready to received at both sides and the channel has been deleted. QUITE IMPROBABLE but just in case
170 info = self.channel[sock]
171 peersock = info["serversock"] if sock is info["clientsock"] else info["clientsock"]
172 try:
173 data = sock.recv(self.buffer_size)
174 if len(data) == 0:
175 self.on_close(sock, "peer closed")
176 else:
177 #print self.data
178 sock = peersock
179 peersock.send(data)
180 except (socket.error, socket.herror, socket.gaierror, socket.timeout) as e:
181 #print self.name, ": Exception %s: %s" % (type(e).__name__, str(e) )
182 self.on_close(sock, "Exception %s: %s" % (type(e).__name__, str(e) ))
183
184
185
186 #def start_timeout(self):
187 # self.timeout = time.time() + 120
188