|
|
|
@ -10,6 +10,7 @@ const { JSDOM } = require('jsdom'); |
|
|
|
|
const log = require('npmlog'); |
|
|
|
|
const pg = require('pg'); |
|
|
|
|
const dbUrlToConfig = require('pg-connection-string').parse; |
|
|
|
|
const metrics = require('prom-client'); |
|
|
|
|
const redis = require('redis'); |
|
|
|
|
const uuid = require('uuid'); |
|
|
|
|
const WebSocket = require('ws'); |
|
|
|
@ -183,6 +184,73 @@ const startServer = async () => { |
|
|
|
|
const redisSubscribeClient = await redisUrlToClient(redisParams, redisUrl); |
|
|
|
|
const redisClient = await redisUrlToClient(redisParams, redisUrl); |
|
|
|
|
|
|
|
|
|
// Collect metrics from Node.js
|
|
|
|
|
metrics.collectDefaultMetrics(); |
|
|
|
|
|
|
|
|
|
new metrics.Gauge({ |
|
|
|
|
name: 'pg_pool_total_connections', |
|
|
|
|
help: 'The total number of clients existing within the pool', |
|
|
|
|
collect() { |
|
|
|
|
this.set(pgPool.totalCount); |
|
|
|
|
}, |
|
|
|
|
}); |
|
|
|
|
|
|
|
|
|
new metrics.Gauge({ |
|
|
|
|
name: 'pg_pool_idle_connections', |
|
|
|
|
help: 'The number of clients which are not checked out but are currently idle in the pool', |
|
|
|
|
collect() { |
|
|
|
|
this.set(pgPool.idleCount); |
|
|
|
|
}, |
|
|
|
|
}); |
|
|
|
|
|
|
|
|
|
new metrics.Gauge({ |
|
|
|
|
name: 'pg_pool_waiting_queries', |
|
|
|
|
help: 'The number of queued requests waiting on a client when all clients are checked out', |
|
|
|
|
collect() { |
|
|
|
|
this.set(pgPool.waitingCount); |
|
|
|
|
}, |
|
|
|
|
}); |
|
|
|
|
|
|
|
|
|
const connectedClients = new metrics.Gauge({ |
|
|
|
|
name: 'connected_clients', |
|
|
|
|
help: 'The number of clients connected to the streaming server', |
|
|
|
|
labelNames: ['type'], |
|
|
|
|
}); |
|
|
|
|
|
|
|
|
|
connectedClients.set({ type: 'websocket' }, 0); |
|
|
|
|
connectedClients.set({ type: 'eventsource' }, 0); |
|
|
|
|
|
|
|
|
|
const connectedChannels = new metrics.Gauge({ |
|
|
|
|
name: 'connected_channels', |
|
|
|
|
help: 'The number of channels the streaming server is streaming to', |
|
|
|
|
labelNames: [ 'type', 'channel' ] |
|
|
|
|
}); |
|
|
|
|
|
|
|
|
|
const redisSubscriptions = new metrics.Gauge({ |
|
|
|
|
name: 'redis_subscriptions', |
|
|
|
|
help: 'The number of Redis channels the streaming server is subscribed to', |
|
|
|
|
}); |
|
|
|
|
|
|
|
|
|
// When checking metrics in the browser, the favicon is requested this
|
|
|
|
|
// prevents the request from falling through to the API Router, which would
|
|
|
|
|
// error for this endpoint:
|
|
|
|
|
app.get('/favicon.ico', (req, res) => res.status(404).end()); |
|
|
|
|
|
|
|
|
|
app.get('/api/v1/streaming/health', (req, res) => { |
|
|
|
|
res.writeHead(200, { 'Content-Type': 'text/plain' }); |
|
|
|
|
res.end('OK'); |
|
|
|
|
}); |
|
|
|
|
|
|
|
|
|
app.get('/metrics', async (req, res) => { |
|
|
|
|
try { |
|
|
|
|
res.set('Content-Type', metrics.register.contentType); |
|
|
|
|
res.end(await metrics.register.metrics()); |
|
|
|
|
} catch (ex) { |
|
|
|
|
log.error(ex); |
|
|
|
|
res.status(500).end(); |
|
|
|
|
} |
|
|
|
|
}); |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* @param {string[]} channels |
|
|
|
|
* @returns {function(): void} |
|
|
|
@ -240,6 +308,7 @@ const startServer = async () => { |
|
|
|
|
if (subs[channel].length === 0) { |
|
|
|
|
log.verbose(`Subscribe ${channel}`); |
|
|
|
|
redisSubscribeClient.subscribe(channel, onRedisMessage); |
|
|
|
|
redisSubscriptions.inc(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
subs[channel].push(callback); |
|
|
|
@ -261,6 +330,7 @@ const startServer = async () => { |
|
|
|
|
if (subs[channel].length === 0) { |
|
|
|
|
log.verbose(`Unsubscribe ${channel}`); |
|
|
|
|
redisSubscribeClient.unsubscribe(channel); |
|
|
|
|
redisSubscriptions.dec(); |
|
|
|
|
delete subs[channel]; |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
@ -434,7 +504,7 @@ const startServer = async () => { |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* @param {any} req |
|
|
|
|
* @param {string} channelName |
|
|
|
|
* @param {string|undefined} channelName |
|
|
|
|
* @returns {Promise.<void>} |
|
|
|
|
*/ |
|
|
|
|
const checkScopes = (req, channelName) => new Promise((resolve, reject) => { |
|
|
|
@ -537,10 +607,14 @@ const startServer = async () => { |
|
|
|
|
res.on('close', () => { |
|
|
|
|
unsubscribe(`${redisPrefix}${accessTokenChannelId}`, listener); |
|
|
|
|
unsubscribe(`${redisPrefix}${systemChannelId}`, listener); |
|
|
|
|
|
|
|
|
|
connectedChannels.labels({ type: 'eventsource', channel: 'system' }).dec(2); |
|
|
|
|
}); |
|
|
|
|
|
|
|
|
|
subscribe(`${redisPrefix}${accessTokenChannelId}`, listener); |
|
|
|
|
subscribe(`${redisPrefix}${systemChannelId}`, listener); |
|
|
|
|
|
|
|
|
|
connectedChannels.labels({ type: 'eventsource', channel: 'system' }).inc(2); |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
@ -554,7 +628,19 @@ const startServer = async () => { |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
accountFromRequest(req).then(() => checkScopes(req, channelNameFromPath(req))).then(() => { |
|
|
|
|
const channelName = channelNameFromPath(req); |
|
|
|
|
|
|
|
|
|
// If no channelName can be found for the request, then we should terminate
|
|
|
|
|
// the connection, as there's nothing to stream back
|
|
|
|
|
if (!channelName) { |
|
|
|
|
const err = new Error('Unknown channel requested'); |
|
|
|
|
err.status = 400; |
|
|
|
|
|
|
|
|
|
next(err); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
accountFromRequest(req).then(() => checkScopes(req, channelName)).then(() => { |
|
|
|
|
subscribeHttpToSystemChannel(req, res); |
|
|
|
|
}).then(() => { |
|
|
|
|
next(); |
|
|
|
@ -849,6 +935,15 @@ const startServer = async () => { |
|
|
|
|
const streamToHttp = (req, res) => { |
|
|
|
|
const accountId = req.accountId || req.remoteAddress; |
|
|
|
|
|
|
|
|
|
const channelName = channelNameFromPath(req); |
|
|
|
|
|
|
|
|
|
connectedClients.labels({ type: 'eventsource' }).inc(); |
|
|
|
|
|
|
|
|
|
// In theory we'll always have a channel name, but channelNameFromPath can return undefined:
|
|
|
|
|
if (typeof channelName === 'string') { |
|
|
|
|
connectedChannels.labels({ type: 'eventsource', channel: channelName }).inc(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
res.setHeader('Content-Type', 'text/event-stream'); |
|
|
|
|
res.setHeader('Cache-Control', 'no-store'); |
|
|
|
|
res.setHeader('Transfer-Encoding', 'chunked'); |
|
|
|
@ -859,6 +954,14 @@ const startServer = async () => { |
|
|
|
|
|
|
|
|
|
req.on('close', () => { |
|
|
|
|
log.verbose(req.requestId, `Ending stream for ${accountId}`); |
|
|
|
|
// We decrement these counters here instead of in streamHttpEnd as in that
|
|
|
|
|
// method we don't have knowledge of the channel names
|
|
|
|
|
connectedClients.labels({ type: 'eventsource' }).dec(); |
|
|
|
|
// In theory we'll always have a channel name, but channelNameFromPath can return undefined:
|
|
|
|
|
if (typeof channelName === 'string') { |
|
|
|
|
connectedChannels.labels({ type: 'eventsource', channel: channelName }).dec(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
clearInterval(heartbeat); |
|
|
|
|
}); |
|
|
|
|
|
|
|
|
@ -913,40 +1016,18 @@ const startServer = async () => { |
|
|
|
|
res.end(JSON.stringify({ error: 'Not found' })); |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
app.use(setRequestId); |
|
|
|
|
app.use(setRemoteAddress); |
|
|
|
|
app.use(allowCrossDomain); |
|
|
|
|
const api = express.Router(); |
|
|
|
|
|
|
|
|
|
app.get('/api/v1/streaming/health', (req, res) => { |
|
|
|
|
res.writeHead(200, { 'Content-Type': 'text/plain' }); |
|
|
|
|
res.end('OK'); |
|
|
|
|
}); |
|
|
|
|
app.use(api); |
|
|
|
|
|
|
|
|
|
api.use(setRequestId); |
|
|
|
|
api.use(setRemoteAddress); |
|
|
|
|
api.use(allowCrossDomain); |
|
|
|
|
|
|
|
|
|
app.get('/metrics', (req, res) => server.getConnections((err, count) => { |
|
|
|
|
res.writeHeader(200, { 'Content-Type': 'application/openmetrics-text; version=1.0.0; charset=utf-8' }); |
|
|
|
|
res.write('# TYPE connected_clients gauge\n'); |
|
|
|
|
res.write('# HELP connected_clients The number of clients connected to the streaming server\n'); |
|
|
|
|
res.write(`connected_clients ${count}.0\n`); |
|
|
|
|
res.write('# TYPE connected_channels gauge\n'); |
|
|
|
|
res.write('# HELP connected_channels The number of Redis channels the streaming server is subscribed to\n'); |
|
|
|
|
res.write(`connected_channels ${Object.keys(subs).length}.0\n`); |
|
|
|
|
res.write('# TYPE pg_pool_total_connections gauge\n'); |
|
|
|
|
res.write('# HELP pg_pool_total_connections The total number of clients existing within the pool\n'); |
|
|
|
|
res.write(`pg_pool_total_connections ${pgPool.totalCount}.0\n`); |
|
|
|
|
res.write('# TYPE pg_pool_idle_connections gauge\n'); |
|
|
|
|
res.write('# HELP pg_pool_idle_connections The number of clients which are not checked out but are currently idle in the pool\n'); |
|
|
|
|
res.write(`pg_pool_idle_connections ${pgPool.idleCount}.0\n`); |
|
|
|
|
res.write('# TYPE pg_pool_waiting_queries gauge\n'); |
|
|
|
|
res.write('# HELP pg_pool_waiting_queries The number of queued requests waiting on a client when all clients are checked out\n'); |
|
|
|
|
res.write(`pg_pool_waiting_queries ${pgPool.waitingCount}.0\n`); |
|
|
|
|
res.write('# EOF\n'); |
|
|
|
|
res.end(); |
|
|
|
|
})); |
|
|
|
|
|
|
|
|
|
app.use(authenticationMiddleware); |
|
|
|
|
app.use(errorMiddleware); |
|
|
|
|
|
|
|
|
|
app.get('/api/v1/streaming/*', (req, res) => { |
|
|
|
|
api.use(authenticationMiddleware); |
|
|
|
|
api.use(errorMiddleware); |
|
|
|
|
|
|
|
|
|
api.get('/api/v1/streaming/*', (req, res) => { |
|
|
|
|
channelNameToIds(req, channelNameFromPath(req), req.query).then(({ channelIds, options }) => { |
|
|
|
|
const onSend = streamToHttp(req, res); |
|
|
|
|
const onEnd = streamHttpEnd(req, subscriptionHeartbeat(channelIds)); |
|
|
|
@ -1141,15 +1222,16 @@ const startServer = async () => { |
|
|
|
|
* @typedef WebSocketSession |
|
|
|
|
* @property {any} socket |
|
|
|
|
* @property {any} request |
|
|
|
|
* @property {Object.<string, { listener: SubscriptionListener, stopHeartbeat: function(): void }>} subscriptions |
|
|
|
|
* @property {Object.<string, { channelName: string, listener: SubscriptionListener, stopHeartbeat: function(): void }>} subscriptions |
|
|
|
|
*/ |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* @param {WebSocketSession} session |
|
|
|
|
* @param {string} channelName |
|
|
|
|
* @param {StreamParams} params |
|
|
|
|
* @returns {void} |
|
|
|
|
*/ |
|
|
|
|
const subscribeWebsocketToChannel = ({ socket, request, subscriptions }, channelName, params) => |
|
|
|
|
const subscribeWebsocketToChannel = ({ socket, request, subscriptions }, channelName, params) => { |
|
|
|
|
checkScopes(request, channelName).then(() => channelNameToIds(request, channelName, params)).then(({ |
|
|
|
|
channelIds, |
|
|
|
|
options, |
|
|
|
@ -1162,7 +1244,10 @@ const startServer = async () => { |
|
|
|
|
const stopHeartbeat = subscriptionHeartbeat(channelIds); |
|
|
|
|
const listener = streamFrom(channelIds, request, onSend, undefined, options.needsFiltering); |
|
|
|
|
|
|
|
|
|
connectedChannels.labels({ type: 'websocket', channel: channelName }).inc(); |
|
|
|
|
|
|
|
|
|
subscriptions[channelIds.join(';')] = { |
|
|
|
|
channelName, |
|
|
|
|
listener, |
|
|
|
|
stopHeartbeat, |
|
|
|
|
}; |
|
|
|
@ -1170,35 +1255,47 @@ const startServer = async () => { |
|
|
|
|
log.verbose(request.requestId, 'Subscription error:', err.toString()); |
|
|
|
|
socket.send(JSON.stringify({ error: err.toString() })); |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
const removeSubscription = (subscriptions, channelIds, request) => { |
|
|
|
|
log.verbose(request.requestId, `Ending stream from ${channelIds.join(', ')} for ${request.accountId}`); |
|
|
|
|
|
|
|
|
|
const subscription = subscriptions[channelIds.join(';')]; |
|
|
|
|
|
|
|
|
|
if (!subscription) { |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
channelIds.forEach(channelId => { |
|
|
|
|
unsubscribe(`${redisPrefix}${channelId}`, subscription.listener); |
|
|
|
|
}); |
|
|
|
|
|
|
|
|
|
connectedChannels.labels({ type: 'websocket', channel: subscription.channelName }).dec(); |
|
|
|
|
subscription.stopHeartbeat(); |
|
|
|
|
|
|
|
|
|
delete subscriptions[channelIds.join(';')]; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* @param {WebSocketSession} session |
|
|
|
|
* @param {string} channelName |
|
|
|
|
* @param {StreamParams} params |
|
|
|
|
* @returns {void} |
|
|
|
|
*/ |
|
|
|
|
const unsubscribeWebsocketFromChannel = ({ socket, request, subscriptions }, channelName, params) => |
|
|
|
|
const unsubscribeWebsocketFromChannel = ({ socket, request, subscriptions }, channelName, params) => { |
|
|
|
|
channelNameToIds(request, channelName, params).then(({ channelIds }) => { |
|
|
|
|
log.verbose(request.requestId, `Ending stream from ${channelIds.join(', ')} for ${request.accountId}`); |
|
|
|
|
|
|
|
|
|
const subscription = subscriptions[channelIds.join(';')]; |
|
|
|
|
removeSubscription(subscriptions, channelIds, request); |
|
|
|
|
}).catch(err => { |
|
|
|
|
log.verbose(request.requestId, 'Unsubscribe error:', err); |
|
|
|
|
|
|
|
|
|
if (!subscription) { |
|
|
|
|
return; |
|
|
|
|
// If we have a socket that is alive and open still, send the error back to the client:
|
|
|
|
|
// FIXME: In other parts of the code ws === socket
|
|
|
|
|
if (socket.isAlive && socket.readyState === socket.OPEN) { |
|
|
|
|
socket.send(JSON.stringify({ error: "Error unsubscribing from channel" })); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
const { listener, stopHeartbeat } = subscription; |
|
|
|
|
|
|
|
|
|
channelIds.forEach(channelId => { |
|
|
|
|
unsubscribe(`${redisPrefix}${channelId}`, listener); |
|
|
|
|
}); |
|
|
|
|
|
|
|
|
|
stopHeartbeat(); |
|
|
|
|
|
|
|
|
|
delete subscriptions[channelIds.join(';')]; |
|
|
|
|
}).catch(err => { |
|
|
|
|
log.verbose(request.requestId, 'Unsubscription error:', err); |
|
|
|
|
socket.send(JSON.stringify({ error: err.toString() })); |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* @param {WebSocketSession} session |
|
|
|
@ -1219,16 +1316,20 @@ const startServer = async () => { |
|
|
|
|
subscribe(`${redisPrefix}${systemChannelId}`, listener); |
|
|
|
|
|
|
|
|
|
subscriptions[accessTokenChannelId] = { |
|
|
|
|
channelName: 'system', |
|
|
|
|
listener, |
|
|
|
|
stopHeartbeat: () => { |
|
|
|
|
}, |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
subscriptions[systemChannelId] = { |
|
|
|
|
channelName: 'system', |
|
|
|
|
listener, |
|
|
|
|
stopHeartbeat: () => { |
|
|
|
|
}, |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
connectedChannels.labels({ type: 'websocket', channel: 'system' }).inc(2); |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
@ -1255,6 +1356,8 @@ const startServer = async () => { |
|
|
|
|
ws.isAlive = true; |
|
|
|
|
}); |
|
|
|
|
|
|
|
|
|
connectedClients.labels({ type: 'websocket' }).inc(); |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* @type {WebSocketSession} |
|
|
|
|
*/ |
|
|
|
@ -1265,17 +1368,18 @@ const startServer = async () => { |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
const onEnd = () => { |
|
|
|
|
const keys = Object.keys(session.subscriptions); |
|
|
|
|
const subscriptions = Object.keys(session.subscriptions); |
|
|
|
|
|
|
|
|
|
keys.forEach(channelIds => { |
|
|
|
|
const { listener, stopHeartbeat } = session.subscriptions[channelIds]; |
|
|
|
|
subscriptions.forEach(channelIds => { |
|
|
|
|
removeSubscription(session.subscriptions, channelIds.split(';'), req) |
|
|
|
|
}); |
|
|
|
|
|
|
|
|
|
channelIds.split(';').forEach(channelId => { |
|
|
|
|
unsubscribe(`${redisPrefix}${channelId}`, listener); |
|
|
|
|
}); |
|
|
|
|
// ensure garbage collection:
|
|
|
|
|
session.socket = null; |
|
|
|
|
session.request = null; |
|
|
|
|
session.subscriptions = {}; |
|
|
|
|
|
|
|
|
|
stopHeartbeat(); |
|
|
|
|
}); |
|
|
|
|
connectedClients.labels({ type: 'websocket' }).dec(); |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
ws.on('close', onEnd); |
|
|
|
|