|
|
@ -59,6 +59,7 @@ const subscribe = ({ channelName, params, onConnect }) => { |
|
|
|
subscriptionCounters[key] = subscriptionCounters[key] || 0; |
|
|
|
subscriptionCounters[key] = subscriptionCounters[key] || 0; |
|
|
|
|
|
|
|
|
|
|
|
if (subscriptionCounters[key] === 0) { |
|
|
|
if (subscriptionCounters[key] === 0) { |
|
|
|
|
|
|
|
// @ts-expect-error
|
|
|
|
sharedConnection.send(JSON.stringify({ type: 'subscribe', stream: channelName, ...params })); |
|
|
|
sharedConnection.send(JSON.stringify({ type: 'subscribe', stream: channelName, ...params })); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
@ -74,7 +75,9 @@ const unsubscribe = ({ channelName, params, onDisconnect }) => { |
|
|
|
|
|
|
|
|
|
|
|
subscriptionCounters[key] = subscriptionCounters[key] || 1; |
|
|
|
subscriptionCounters[key] = subscriptionCounters[key] || 1; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// @ts-expect-error
|
|
|
|
if (subscriptionCounters[key] === 1 && sharedConnection.readyState === WebSocketClient.OPEN) { |
|
|
|
if (subscriptionCounters[key] === 1 && sharedConnection.readyState === WebSocketClient.OPEN) { |
|
|
|
|
|
|
|
// @ts-expect-error
|
|
|
|
sharedConnection.send(JSON.stringify({ type: 'unsubscribe', stream: channelName, ...params })); |
|
|
|
sharedConnection.send(JSON.stringify({ type: 'unsubscribe', stream: channelName, ...params })); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
@ -83,11 +86,12 @@ const unsubscribe = ({ channelName, params, onDisconnect }) => { |
|
|
|
}; |
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
const sharedCallbacks = { |
|
|
|
const sharedCallbacks = { |
|
|
|
connected () { |
|
|
|
connected() { |
|
|
|
subscriptions.forEach(subscription => subscribe(subscription)); |
|
|
|
subscriptions.forEach(subscription => subscribe(subscription)); |
|
|
|
}, |
|
|
|
}, |
|
|
|
|
|
|
|
|
|
|
|
received (data) { |
|
|
|
// @ts-expect-error
|
|
|
|
|
|
|
|
received(data) { |
|
|
|
const { stream } = data; |
|
|
|
const { stream } = data; |
|
|
|
|
|
|
|
|
|
|
|
subscriptions.filter(({ channelName, params }) => { |
|
|
|
subscriptions.filter(({ channelName, params }) => { |
|
|
@ -111,11 +115,11 @@ const sharedCallbacks = { |
|
|
|
}); |
|
|
|
}); |
|
|
|
}, |
|
|
|
}, |
|
|
|
|
|
|
|
|
|
|
|
disconnected () { |
|
|
|
disconnected() { |
|
|
|
subscriptions.forEach(subscription => unsubscribe(subscription)); |
|
|
|
subscriptions.forEach(subscription => unsubscribe(subscription)); |
|
|
|
}, |
|
|
|
}, |
|
|
|
|
|
|
|
|
|
|
|
reconnected () { |
|
|
|
reconnected() { |
|
|
|
}, |
|
|
|
}, |
|
|
|
}; |
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
@ -138,6 +142,7 @@ const channelNameWithInlineParams = (channelName, params) => { |
|
|
|
* @param {function(Function, Function): { onConnect: (function(): void), onReceive: (function(StreamEvent): void), onDisconnect: (function(): void) }} callbacks |
|
|
|
* @param {function(Function, Function): { onConnect: (function(): void), onReceive: (function(StreamEvent): void), onDisconnect: (function(): void) }} callbacks |
|
|
|
* @return {function(): void} |
|
|
|
* @return {function(): void} |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
|
|
|
|
// @ts-expect-error
|
|
|
|
export const connectStream = (channelName, params, callbacks) => (dispatch, getState) => { |
|
|
|
export const connectStream = (channelName, params, callbacks) => (dispatch, getState) => { |
|
|
|
const streamingAPIBaseURL = getState().getIn(['meta', 'streaming_api_base_url']); |
|
|
|
const streamingAPIBaseURL = getState().getIn(['meta', 'streaming_api_base_url']); |
|
|
|
const accessToken = getState().getIn(['meta', 'access_token']); |
|
|
|
const accessToken = getState().getIn(['meta', 'access_token']); |
|
|
@ -147,19 +152,19 @@ export const connectStream = (channelName, params, callbacks) => (dispatch, getS |
|
|
|
// to using individual connections for each channel
|
|
|
|
// to using individual connections for each channel
|
|
|
|
if (!streamingAPIBaseURL.startsWith('ws')) { |
|
|
|
if (!streamingAPIBaseURL.startsWith('ws')) { |
|
|
|
const connection = createConnection(streamingAPIBaseURL, accessToken, channelNameWithInlineParams(channelName, params), { |
|
|
|
const connection = createConnection(streamingAPIBaseURL, accessToken, channelNameWithInlineParams(channelName, params), { |
|
|
|
connected () { |
|
|
|
connected() { |
|
|
|
onConnect(); |
|
|
|
onConnect(); |
|
|
|
}, |
|
|
|
}, |
|
|
|
|
|
|
|
|
|
|
|
received (data) { |
|
|
|
received(data) { |
|
|
|
onReceive(data); |
|
|
|
onReceive(data); |
|
|
|
}, |
|
|
|
}, |
|
|
|
|
|
|
|
|
|
|
|
disconnected () { |
|
|
|
disconnected() { |
|
|
|
onDisconnect(); |
|
|
|
onDisconnect(); |
|
|
|
}, |
|
|
|
}, |
|
|
|
|
|
|
|
|
|
|
|
reconnected () { |
|
|
|
reconnected() { |
|
|
|
onConnect(); |
|
|
|
onConnect(); |
|
|
|
}, |
|
|
|
}, |
|
|
|
}); |
|
|
|
}); |
|
|
@ -227,14 +232,19 @@ const handleEventSourceMessage = (e, received) => { |
|
|
|
const createConnection = (streamingAPIBaseURL, accessToken, channelName, { connected, received, disconnected, reconnected }) => { |
|
|
|
const createConnection = (streamingAPIBaseURL, accessToken, channelName, { connected, received, disconnected, reconnected }) => { |
|
|
|
const params = channelName.split('&'); |
|
|
|
const params = channelName.split('&'); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// @ts-expect-error
|
|
|
|
channelName = params.shift(); |
|
|
|
channelName = params.shift(); |
|
|
|
|
|
|
|
|
|
|
|
if (streamingAPIBaseURL.startsWith('ws')) { |
|
|
|
if (streamingAPIBaseURL.startsWith('ws')) { |
|
|
|
|
|
|
|
// @ts-expect-error
|
|
|
|
const ws = new WebSocketClient(`${streamingAPIBaseURL}/api/v1/streaming/?${params.join('&')}`, accessToken); |
|
|
|
const ws = new WebSocketClient(`${streamingAPIBaseURL}/api/v1/streaming/?${params.join('&')}`, accessToken); |
|
|
|
|
|
|
|
|
|
|
|
ws.onopen = connected; |
|
|
|
// @ts-expect-error
|
|
|
|
ws.onmessage = e => received(JSON.parse(e.data)); |
|
|
|
ws.onopen = connected; |
|
|
|
ws.onclose = disconnected; |
|
|
|
ws.onmessage = e => received(JSON.parse(e.data)); |
|
|
|
|
|
|
|
// @ts-expect-error
|
|
|
|
|
|
|
|
ws.onclose = disconnected; |
|
|
|
|
|
|
|
// @ts-expect-error
|
|
|
|
ws.onreconnect = reconnected; |
|
|
|
ws.onreconnect = reconnected; |
|
|
|
|
|
|
|
|
|
|
|
return ws; |
|
|
|
return ws; |
|
|
@ -256,7 +266,7 @@ const createConnection = (streamingAPIBaseURL, accessToken, channelName, { conne |
|
|
|
}; |
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
KNOWN_EVENT_TYPES.forEach(type => { |
|
|
|
KNOWN_EVENT_TYPES.forEach(type => { |
|
|
|
es.addEventListener(type, e => handleEventSourceMessage(/** @type {MessageEvent} */ (e), received)); |
|
|
|
es.addEventListener(type, e => handleEventSourceMessage(/** @type {MessageEvent} */(e), received)); |
|
|
|
}); |
|
|
|
}); |
|
|
|
|
|
|
|
|
|
|
|
es.onerror = /** @type {function(): void} */ (disconnected); |
|
|
|
es.onerror = /** @type {function(): void} */ (disconnected); |
|
|
|