update from RIFT as of 696b75d2fe9fb046261b08c616f1bcf6c0b54a9b third try
[osm/UI.git] / skyquake / framework / core / api_utils / sockets.js
1 /*
2 *
3 * Copyright 2016 RIFT.IO Inc
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18 //SOCKET MANAGER
19 // test
20 //Supports localhost node polling subscriptions and pass through subscriptions to other websockets
21 //
22 //TODO REFACTOR: this needs to happen. there's too much boilerplate code in here.
23 //TODO Document after refactoring
24 //TODO Improved logging for debugging
25 //TODO List of URLS
26
27 var WebSocket = require('ws');
28 var Request = require('request');
29 var _ = require('lodash');
30 var constants = require('./constants.js');
31 var Promise = require('promise');
32 var url = require('url');
33 var sockjs = require('sockjs');
34 var websocket_multiplex = require('websocket-multiplex');
35 var utils = require('./utils.js');
36 var configurationAPI = require('../modules/api/configuration.js');
37
38
39 var Subscriptions = function() {
40 this.ID = 0;
41 this.socketServers = {};
42 };
43
44 Subscriptions.prototype.configure = function(config) {
45 this.config = config;
46 this.ready = true;
47 // 1. Setup SockJS server
48 var sockjs_opts = {};
49 this.service = sockjs.createServer(sockjs_opts);
50 // 2. Setup multiplexing
51 this.multiplexer = new websocket_multiplex.MultiplexServer(this.service);
52
53 this.service.installHandlers(this.config.httpServer, {prefix:'/multiplex'});
54 }
55
56 /**
57 * [subscribe description]
58 * @param {Object} req
59 * @param {String} req.body.url May be http, https, or ws
60 * @param {Function} req.body.transform A function that will transform
61 * the data before sending it out
62 * through the socket. Receives one
63 * argument, which is the data
64 * returned from the subscription.
65 * @param {Function} callback Function that will receive the SubscriptionData reference object
66 * @return {Object} SubscriptionReference An object containing the subscription information.
67 * @return {Number} SubscriptionReference.id The subscription ID
68 */
69 Subscriptions.prototype.subscribe = function(req, callback) {
70 var self = this;
71 var URL = req.body.url;
72 var SubscriptionReference;
73 var sessionId = req.session.id;
74 var protocolTest = /^(.{2,5}):\/\//;
75 var protocol = URL.match(protocolTest);
76
77 if (!protocol) {
78 var origin = '';
79 if (req.query['api_server']) {
80 var api_server_protocol = req.query['api_server'].match(protocolTest)[1];
81 var api_server_origin = req.query['api_server'] + ':' + utils.getPortForProtocol(api_server_protocol);
82 origin = api_server_origin;
83 protocol = api_server_protocol;
84 } else {
85 // TODO: NEED A WAY (URL PARAM) TO TRIGGER THIS PART OF THE CODE
86 // WHICH IS NECESSARY FOR DEVELOPMENT ON MAC
87 // No protocol was passed with the url in the body. Assume req.protocol is protocol and construct URL
88 protocol = req.protocol || 'https';
89 // Converting relative URL to full path.
90 origin = protocol + '://' + req.headers.host
91 }
92 var a = url.resolve(origin, req.baseUrl);
93 var b = url.resolve(a, URL);
94 URL = b;
95 console.log('DEBUG URL IS', URL);
96 } else {
97 protocol = protocol[1]
98 }
99
100 return new Promise(function(resolve, reject) {
101
102 if (!self.ready) {
103 return reject({
104 statusCode: 500,
105 errorMessage: 'SocketManager not configured yet. Cannot proceed'
106 })
107 }
108
109 self.createWebSocketServer().then(function(successData) {
110
111 self.socketServers[sessionId + successData.id] = successData;
112 self.setUpSocketInstance(protocol, URL, req, self.socketServers[sessionId + successData.id].wss, successData.id);
113 return resolve({
114 statusCode: 200,
115 data: {
116 id: self.socketServers[sessionId + successData.id].id
117 }
118 });
119 },
120 function(errorData) {
121 return reject({
122 statusCode: 503,
123 errorMessage: errorData.error
124 });
125 });
126 });
127 };
128
129 Subscriptions.prototype.setUpSocketInstance = function(protocol, URL, req, wss, channelId) {
130 var self = this;
131 //Need to refactor this to make it more scalable/dynamic
132 switch (protocol) {
133 case 'http':
134 self.socketInstance(URL, req, wss, PollingSocket, channelId);
135 break;
136 case 'https':
137 self.socketInstance(URL, req, wss, PollingSocket, channelId);
138 break;
139 case 'ws':
140 self.socketInstance(URL, req, wss, WebSocket, channelId);
141 break;
142 case 'wss':
143 self.socketInstance(URL, req, wss, WebSocket, channelId);
144 break;
145 }
146 }
147
148 Subscriptions.prototype.createWebSocketServer = function() {
149 var self = this;
150
151 return new Promise(function(resolve, reject) {
152 var wss = null;
153
154 self.ID++;
155
156 wss = self.multiplexer.registerChannel(self.ID);
157
158 return resolve({
159 id: self.ID,
160 wss: wss
161 });
162 });
163 };
164
165 Subscriptions.prototype.socketInstance = function(url, req, wss, Type, channelId) {
166 console.log('Creating a new socketInstance for:', url, 'sessionId:', req.session.id);
167 var self = this;
168 var Socket = null;
169 var Connections = [];
170 var Index = 0;
171 var sessionId = req.session.id;
172 var wssRef = wss;
173 var channelIdRef = channelId;
174 wss.on('connection', function(conn) {
175 console.log('New connection to multiplex-server for channelId', channelIdRef);
176
177 conn.on('data', function(msg) {
178 console.log('Test purposes only. Received message from client:', msg);
179 conn.write('Test purposes only. Echo: ' + msg);
180 });
181
182 if (!Socket) {
183 if (Type == PollingSocket) {
184 Socket = new Type(url, req, 1000, req.body);
185 } else {
186 Socket = new Type(url, ['Bearer', req.session.passport.user.user['access_token']]);
187 }
188 console.log('Socket assigned for url', url);
189 }
190 conn.index = Index++;
191 // Add this client-connection into list of connections for this channelId/wss
192 Connections.push(conn);
193
194 conn.on('close', function() {
195 // Remove the browser connection from list of Connections for this channelId/wss
196 Connections.splice(conn.index, 1);
197 console.log('splicing conn.index', conn.index,' for channel', channelIdRef);
198
199 // Check if no other connections exist
200 if (Connections.length == 0) {
201 console.log('No more connections for', channelId, '. Will close socket server and downstream socket/poller.');
202 try {
203 // Close downstream socket/poller
204 Socket.close();
205
206 // Close socket server
207 conn.end();
208
209 // Remove from list of socketServers
210 delete self.socketServers[sessionId + wss.id];
211
212 // There is no unregisterChannel. Assuming
213 // sockjs/websocket-multiplex do the right
214 // things and cleanup after themselves.
215 } catch (e) {
216 console.log('Error closing socket server: ', e);
217 }
218 Index = 0;
219 delete Socket;
220 }
221 });
222
223 Socket.onopen = function() {
224 console.log('Opened a websocket to southbound server');
225 };
226
227 Socket.onerror = function(error) {
228 console.log('Error on southbound connection. Error:', error);
229 }
230
231 Socket.onmessage = function(data) {
232 var i;
233 var self = this;
234 if (req.body.transform && req.body.transform.constructor.name == "String") {
235 //someTransformObject[req.body.transform](data, send)
236 //req.body.transform(data, send);
237 } else {
238 if (Type == PollingSocket) {
239 send(data);
240 } else {
241 send(data.data);
242 }
243 }
244
245 function send(payload) {
246 var is401 = false;
247 try {
248 if (typeof payload == 'string') {
249 var jsonPayload = JSON.parse(payload);
250 is401 = jsonPayload.statusCode == 401;
251 }
252 else {
253 is401 = payload.statusCode == 401;
254 }
255 } catch(e) {
256 payload = {}
257 }
258
259 for (i = Connections.length - 1; i >= 0; i -= 1) {
260 // console.log('Sending payload to channelId:', channelId, ' on connection', i);
261 Connections[i].write(payload);
262 };
263 if (is401) {
264 try {
265 Socket.close();
266 } catch (e) {
267 console.log('Error closing Socket')
268 }
269 }
270 }
271
272 };
273 });
274 };
275
276 function PollingSocket(url, req, interval, config) {
277 console.log('Creating a new PollingSocket for url', url, 'sessionId:', req.session.id);
278 var self = this;
279 self.isClosed = false;
280 var requestHeaders = {};
281 _.extend(requestHeaders, {
282 Cookie: req.get('Cookie')
283 });
284
285 var pollServer = function() {
286 Request({
287 url: utils.projectContextUrl(req, url),
288 method: config.method || 'GET',
289 headers: requestHeaders,
290 json: config.payload,
291 rejectUnauthorized: false,
292 forever: constants.FOREVER_ON
293 }, function(error, response, body) {
294 if (error) {
295 console.log('Error polling: ' + url);
296 } else {
297 if (!self.isClosed) {
298 if(process.env.DISABLE_POLLING != "TRUE") {
299 self.poll = setTimeout(pollServer, 1000 || interval);
300 } else {
301 console.log('Polling is disabled. Finishing request.')
302 }
303 var data = response.body;
304 if (self.onmessage) {
305 self.onmessage(data);
306 }
307 }
308 }
309 });
310 };
311 pollServer();
312 };
313
314 PollingSocket.prototype.close = function() {
315 console.log('Closing PollingSocket');
316 var self = this;
317 this.isClosed = true;
318 clearTimeout(self.poll);
319 };
320
321
322 module.exports = Subscriptions;