Merge branch 'master' into pkg_mgmt
[osm/UI.git] / skyquake / framework / core / api_utils / sockets.js
1 /*
2 *
3 * Copyright 2016 RIFT.IO Inc
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18 //SOCKET MANAGER
19 // test
20 //Supports localhost node polling subscriptions and pass through subscriptions to other websockets
21 //
22 //TODO REFACTOR: this needs to happen. there's too much boilerplate code in here.
23 //TODO Document after refactoring
24 //TODO Improved logging for debugging
25 //TODO List of URLS
26
27 var WebSocket = require('ws');
28 var Request = require('request');
29 var _ = require('lodash');
30 var constants = require('./constants.js');
31 var Promise = require('promise');
32 var url = require('url');
33 var sockjs = require('sockjs');
34 var websocket_multiplex = require('websocket-multiplex');
35 var utils = require('./utils.js');
36
37
38 var Subscriptions = function() {
39 this.ID = 0;
40 this.socketServers = {};
41 };
42
43 Subscriptions.prototype.configure = function(config) {
44 this.config = config;
45 this.ready = true;
46 // 1. Setup SockJS server
47 var sockjs_opts = {};
48 this.service = sockjs.createServer(sockjs_opts);
49 // 2. Setup multiplexing
50 this.multiplexer = new websocket_multiplex.MultiplexServer(this.service);
51
52 this.service.installHandlers(this.config.httpServer, {prefix:'/multiplex'});
53 }
54
55 /**
56 * [subscribe description]
57 * @param {Object} req
58 * @param {String} req.body.url May be http, https, or ws
59 * @param {Function} req.body.transform A function that will transform
60 * the data before sending it out
61 * through the socket. Receives one
62 * argument, which is the data
63 * returned from the subscription.
64 * @param {Function} callback Function that will receive the SubscriptionData reference object
65 * @return {Object} SubscriptionReference An object containing the subscription information.
66 * @return {Number} SubscriptionReference.id The subscription ID
67 */
68 Subscriptions.prototype.subscribe = function(req, callback) {
69 var self = this;
70 var URL = req.body.url;
71 var SubscriptionReference;
72 var sessionId = req.session.id;
73 var protocolTest = /^(.{2,5}):\/\//;
74 var protocol = URL.match(protocolTest);
75
76 if (!protocol) {
77 var origin = '';
78 if (req.query['api_server']) {
79 var api_server_protocol = req.query['api_server'].match(protocolTest)[1];
80 var api_server_origin = req.query['api_server'] + ':' + utils.getPortForProtocol(api_server_protocol);
81 origin = api_server_origin;
82 protocol = api_server_protocol;
83 } else {
84 // TODO: NEED A WAY (URL PARAM) TO TRIGGER THIS PART OF THE CODE
85 // WHICH IS NECESSARY FOR DEVELOPMENT ON MAC
86 // No protocol was passed with the url in the body. Assume req.protocol is protocol and construct URL
87 protocol = req.protocol || 'https';
88 // Converting relative URL to full path.
89 origin = protocol + '://' + req.headers.host
90 }
91 var a = url.resolve(origin, req.baseUrl);
92 var b = url.resolve(a, URL);
93 URL = b;
94 console.log('DEBUG URL IS', URL);
95 } else {
96 protocol = protocol[1]
97 }
98
99 return new Promise(function(resolve, reject) {
100
101 if (!self.ready) {
102 return reject({
103 statusCode: 500,
104 errorMessage: 'SocketManager not configured yet. Cannot proceed'
105 })
106 }
107
108 self.createWebSocketServer().then(function(successData) {
109
110 self.socketServers[sessionId + successData.id] = successData;
111 self.setUpSocketInstance(protocol, URL, req, self.socketServers[sessionId + successData.id].wss, successData.id);
112 return resolve({
113 statusCode: 200,
114 data: {
115 id: self.socketServers[sessionId + successData.id].id
116 }
117 });
118 },
119 function(errorData) {
120 return reject({
121 statusCode: 503,
122 errorMessage: errorData.error
123 });
124 });
125 });
126 };
127
128 Subscriptions.prototype.setUpSocketInstance = function(protocol, URL, req, wss, channelId) {
129 var self = this;
130 //Need to refactor this to make it more scalable/dynamic
131 switch (protocol) {
132 case 'http':
133 self.socketInstance(URL, req, wss, PollingSocket, channelId);
134 break;
135 case 'https':
136 self.socketInstance(URL, req, wss, PollingSocket, channelId);
137 break;
138 case 'ws':
139 self.socketInstance(URL, req, wss, WebSocket, channelId);
140 break;
141 case 'wss':
142 self.socketInstance(URL, req, wss, WebSocket, channelId);
143 break;
144 }
145 }
146
147 Subscriptions.prototype.createWebSocketServer = function() {
148 var self = this;
149
150 return new Promise(function(resolve, reject) {
151 var wss = null;
152
153 self.ID++;
154
155 wss = self.multiplexer.registerChannel(self.ID);
156
157 return resolve({
158 id: self.ID,
159 wss: wss
160 });
161 });
162 };
163
164 Subscriptions.prototype.socketInstance = function(url, req, wss, Type, channelId) {
165 console.log('Creating a new socketInstance for:', url, 'sessionId:', req.session.id);
166 var self = this;
167 var Socket = null;
168 var Connections = [];
169 var Index = 0;
170 var sessionId = req.session.id;
171 var wssRef = wss;
172 var channelIdRef = channelId;
173 wss.on('connection', function(conn) {
174 console.log('New connection to multiplex-server for channelId', channelIdRef);
175
176 conn.on('data', function(msg) {
177 console.log('Test purposes only. Received message from client:', msg);
178 conn.write('Test purposes only. Echo: ' + msg);
179 });
180
181 if (!Socket) {
182 if (Type == PollingSocket) {
183 Socket = new Type(url, req, 1000, req.body);
184 } else {
185 Socket = new Type(url);
186 }
187 console.log('Socket assigned for url', url);
188 }
189 conn.index = Index++;
190 // Add this client-connection into list of connections for this channelId/wss
191 Connections.push(conn);
192
193 conn.on('close', function() {
194 // Remove the browser connection from list of Connections for this channelId/wss
195 Connections.splice(conn.index, 1);
196 console.log('splicing conn.index', conn.index,' for channel', channelIdRef);
197
198 // Check if no other connections exist
199 if (Connections.length == 0) {
200 console.log('No more connections for', channelId, '. Will close socket server and downstream socket/poller.');
201 try {
202 // Close downstream socket/poller
203 Socket.close();
204
205 // Close socket server
206 conn.end();
207
208 // Remove from list of socketServers
209 delete self.socketServers[sessionId + wss.id];
210
211 // There is no unregisterChannel. Assuming
212 // sockjs/websocket-multiplex do the right
213 // things and cleanup after themselves.
214 } catch (e) {
215 console.log('Error closing socket server: ', e);
216 }
217 Index = 0;
218 delete Socket;
219 }
220 });
221
222 Socket.onopen = function() {
223 console.log('Opened a websocket to southbound server');
224 };
225
226 Socket.onerror = function(error) {
227 console.log('Error on southbound connection. Error:', error);
228 }
229
230 Socket.onmessage = function(data) {
231 var i;
232 var self = this;
233 if (req.body.transform && req.body.transform.constructor.name == "String") {
234 //someTransformObject[req.body.transform](data, send)
235 //req.body.transform(data, send);
236 } else {
237 if (Type == PollingSocket) {
238 send(data);
239 } else {
240 send(data.data);
241 }
242 }
243
244 function send(payload) {
245 var is401 = false;
246 try {
247 if (typeof payload == 'string') {
248 var jsonPayload = JSON.parse(payload);
249 is401 = jsonPayload.statusCode == 401;
250 }
251 else {
252 is401 = payload.statusCode == 401;
253 }
254 } catch(e) {
255 payload = {}
256 }
257
258 for (i = Connections.length - 1; i >= 0; i -= 1) {
259 // console.log('Sending payload to channelId:', channelId, ' on connection', i);
260 Connections[i].write(payload);
261 };
262 if (is401) {
263 try {
264 Socket.close();
265 } catch (e) {
266 console.log('Error closing Socket')
267 }
268 }
269 }
270
271 };
272 });
273 };
274
275 function PollingSocket(url, req, interval, config) {
276 console.log('Creating a new PollingSocket for url', url, 'sessionId:', req.session.id);
277 var self = this;
278 self.isClosed = false;
279 var requestHeaders = {};
280 _.extend(requestHeaders, {
281 'Authorization': req.get('Authorization')
282 });
283
284 var pollServer = function() {
285 Request({
286 url: url,
287 method: config.method || 'GET',
288 headers: requestHeaders,
289 json: config.payload,
290 rejectUnauthorized: false,
291 forever: constants.FOREVER_ON
292 }, function(error, response, body) {
293 if (error) {
294 console.log('Error polling: ' + url);
295 } else {
296 if (!self.isClosed) {
297 self.poll = setTimeout(pollServer, 1000 || interval);
298 var data = response.body;
299 if (self.onmessage) {
300 self.onmessage(data);
301 }
302 }
303 }
304 });
305 };
306 pollServer();
307 };
308
309 PollingSocket.prototype.close = function() {
310 console.log('Closing PollingSocket');
311 var self = this;
312 this.isClosed = true;
313 clearTimeout(self.poll);
314 };
315
316
317 module.exports = Subscriptions;