3 * Copyright 2016 RIFT.IO Inc
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
20 //Supports localhost node polling subscriptions and pass through subscriptions to other websockets
22 //TODO REFACTOR: this needs to happen. there's too much boilerplate code in here.
23 //TODO Document after refactoring
24 //TODO Improved logging for debugging
27 var WebSocket
= require('ws');
28 var Request
= require('request');
29 var _
= require('lodash');
30 var constants
= require('./constants.js');
31 var Promise
= require('promise');
32 var url
= require('url');
33 var sockjs
= require('sockjs');
34 var websocket_multiplex
= require('websocket-multiplex');
35 var utils
= require('./utils.js');
36 var configurationAPI
= require('../modules/api/configuration.js');
39 var Subscriptions = function() {
41 this.socketServers
= {};
44 Subscriptions
.prototype.configure = function(config
) {
47 // 1. Setup SockJS server
49 this.service
= sockjs
.createServer(sockjs_opts
);
50 // 2. Setup multiplexing
51 this.multiplexer
= new websocket_multiplex
.MultiplexServer(this.service
);
53 this.service
.installHandlers(this.config
.httpServer
, {prefix
:'/multiplex'});
57 * [subscribe description]
59 * @param {String} req.body.url May be http, https, or ws
60 * @param {Function} req.body.transform A function that will transform
61 * the data before sending it out
62 * through the socket. Receives one
63 * argument, which is the data
64 * returned from the subscription.
65 * @param {Function} callback Function that will receive the SubscriptionData reference object
66 * @return {Object} SubscriptionReference An object containing the subscription information.
67 * @return {Number} SubscriptionReference.id The subscription ID
69 Subscriptions
.prototype.subscribe = function(req
, callback
) {
71 var URL
= req
.body
.url
;
72 var SubscriptionReference
;
73 var sessionId
= req
.session
.id
;
74 var protocolTest
= /^(.{2,5}):\/\//;
75 var protocol
= URL
.match(protocolTest
);
79 if (req
.query
['api_server']) {
80 var api_server_protocol
= req
.query
['api_server'].match(protocolTest
)[1];
81 var api_server_origin
= req
.query
['api_server'] + ':' + utils
.getPortForProtocol(api_server_protocol
);
82 origin
= api_server_origin
;
83 protocol
= api_server_protocol
;
85 // TODO: NEED A WAY (URL PARAM) TO TRIGGER THIS PART OF THE CODE
86 // WHICH IS NECESSARY FOR DEVELOPMENT ON MAC
87 // No protocol was passed with the url in the body. Assume req.protocol is protocol and construct URL
88 protocol
= req
.protocol
|| 'https';
89 // Converting relative URL to full path.
90 origin
= protocol
+ '://' + req
.headers
.host
92 var a
= url
.resolve(origin
, req
.baseUrl
);
93 var b
= url
.resolve(a
, URL
);
95 console
.log('DEBUG URL IS', URL
);
97 protocol
= protocol
[1]
100 return new Promise(function(resolve
, reject
) {
105 errorMessage
: 'SocketManager not configured yet. Cannot proceed'
109 self
.createWebSocketServer().then(function(successData
) {
111 self
.socketServers
[sessionId
+ successData
.id
] = successData
;
112 self
.setUpSocketInstance(protocol
, URL
, req
, self
.socketServers
[sessionId
+ successData
.id
].wss
, successData
.id
);
116 id
: self
.socketServers
[sessionId
+ successData
.id
].id
120 function(errorData
) {
123 errorMessage
: errorData
.error
129 Subscriptions
.prototype.setUpSocketInstance = function(protocol
, URL
, req
, wss
, channelId
) {
131 //Need to refactor this to make it more scalable/dynamic
134 self
.socketInstance(URL
, req
, wss
, PollingSocket
, channelId
);
137 self
.socketInstance(URL
, req
, wss
, PollingSocket
, channelId
);
140 self
.socketInstance(URL
, req
, wss
, WebSocket
, channelId
);
143 self
.socketInstance(URL
, req
, wss
, WebSocket
, channelId
);
148 Subscriptions
.prototype.createWebSocketServer = function() {
151 return new Promise(function(resolve
, reject
) {
156 wss
= self
.multiplexer
.registerChannel(self
.ID
);
165 Subscriptions
.prototype.socketInstance = function(url
, req
, wss
, Type
, channelId
) {
166 console
.log('Creating a new socketInstance for:', url
, 'sessionId:', req
.session
.id
);
169 var Connections
= [];
171 var sessionId
= req
.session
.id
;
173 var channelIdRef
= channelId
;
174 wss
.on('connection', function(conn
) {
175 console
.log('New connection to multiplex-server for channelId', channelIdRef
);
177 conn
.on('data', function(msg
) {
178 console
.log('Test purposes only. Received message from client:', msg
);
179 conn
.write('Test purposes only. Echo: ' + msg
);
183 if (Type
== PollingSocket
) {
184 Socket
= new Type(url
, req
, 1000, req
.body
);
186 Socket
= new Type(url
);
188 console
.log('Socket assigned for url', url
);
190 conn
.index
= Index
++;
191 // Add this client-connection into list of connections for this channelId/wss
192 Connections
.push(conn
);
194 conn
.on('close', function() {
195 // Remove the browser connection from list of Connections for this channelId/wss
196 Connections
.splice(conn
.index
, 1);
197 console
.log('splicing conn.index', conn
.index
,' for channel', channelIdRef
);
199 // Check if no other connections exist
200 if (Connections
.length
== 0) {
201 console
.log('No more connections for', channelId
, '. Will close socket server and downstream socket/poller.');
203 // Close downstream socket/poller
206 // Close socket server
209 // Remove from list of socketServers
210 delete self
.socketServers
[sessionId
+ wss
.id
];
212 // There is no unregisterChannel. Assuming
213 // sockjs/websocket-multiplex do the right
214 // things and cleanup after themselves.
216 console
.log('Error closing socket server: ', e
);
223 Socket
.onopen = function() {
224 console
.log('Opened a websocket to southbound server');
227 Socket
.onerror = function(error
) {
228 console
.log('Error on southbound connection. Error:', error
);
231 Socket
.onmessage = function(data
) {
234 if (req
.body
.transform
&& req
.body
.transform
.constructor.name
== "String") {
235 //someTransformObject[req.body.transform](data, send)
236 //req.body.transform(data, send);
238 if (Type
== PollingSocket
) {
245 function send(payload
) {
248 if (typeof payload
== 'string') {
249 var jsonPayload
= JSON
.parse(payload
);
250 is401
= jsonPayload
.statusCode
== 401;
253 is401
= payload
.statusCode
== 401;
259 for (i
= Connections
.length
- 1; i
>= 0; i
-= 1) {
260 // console.log('Sending payload to channelId:', channelId, ' on connection', i);
261 Connections
[i
].write(payload
);
267 console
.log('Error closing Socket')
276 function PollingSocket(url
, req
, interval
, config
) {
277 console
.log('Creating a new PollingSocket for url', url
, 'sessionId:', req
.session
.id
);
279 self
.isClosed
= false;
280 var requestHeaders
= {};
281 _
.extend(requestHeaders
, {
282 Cookie
: req
.get('Cookie')
285 var pollServer = function() {
287 url
: utils
.projectContextUrl(req
, url
),
288 method
: config
.method
|| 'GET',
289 headers
: requestHeaders
,
290 json
: config
.payload
,
291 rejectUnauthorized
: false,
292 forever
: constants
.FOREVER_ON
293 }, function(error
, response
, body
) {
295 console
.log('Error polling: ' + url
);
297 if (!self
.isClosed
) {
298 if(process
.env
.DISABLE_POLLING
!= "TRUE") {
299 self
.poll
= setTimeout(pollServer
, 1000 || interval
);
301 console
.log('Polling is disabled. Finishing request.')
303 var data
= response
.body
;
304 if (self
.onmessage
) {
305 self
.onmessage(data
);
314 PollingSocket
.prototype.close = function() {
315 console
.log('Closing PollingSocket');
317 this.isClosed
= true;
318 clearTimeout(self
.poll
);
322 module
.exports
= Subscriptions
;