-
Notifications
You must be signed in to change notification settings - Fork 17
/
index.ts
144 lines (122 loc) · 4.45 KB
/
index.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import { Observable, fromEvent, interval, timer, empty, concat } from 'rxjs';
import { tap, retryWhen, scan, startWith, switchMap, take, repeat } from 'rxjs/operators';
export interface IOptions {
/**
* Period of the interval to run the source$
*/
interval: number;
/**
* How many attempts on error, before throwing definitely to polling subscriber
*/
attempts?: number;
/**
* Strategy taken on source$ errors, with attempts to recover.
*
* 'exponential' will retry waiting an increasing exponential time between attempts.
* You can pass the unit amount, which will be multiplied to the exponential factor.
*
* 'random' will retry waiting a random time between attempts. You can pass the range of randomness.
*
* 'consecutive' will retry waiting a constant time between attempts. You can
* pass the constant, otherwise the polling interval will be used.
*/
backoffStrategy?: 'exponential' | 'random' | 'consecutive';
/**
* Exponential delay factors (2, 4, 16, 32...) will be multiplied to the unit
* to get final amount if 'exponential' strategy is used.
*/
exponentialUnit?: number;
/**
* Range of milli-seconds to pick a random delay between error retries if 'random'
* strategy is used.
*/
randomRange?: [number, number];
/**
* Constant time to delay error retries if 'consecutive' strategy is used
*/
constantTime?: number;
/**
* Flag to enable background polling, ie polling even when the browser is inactive.
*/
backgroundPolling?: boolean;
}
const defaultOptions: Partial<IOptions> = {
attempts: 9,
backoffStrategy: 'exponential',
exponentialUnit: 1000, // 1 second
randomRange: [1000, 10000],
backgroundPolling: false
};
/**
* Run a polling stream for the source$
* @param request$ Source Observable which will be ran every interval
* @param userOptions Polling options
*/
export default function polling<T>(request$: Observable<T>, userOptions: IOptions): Observable<T> {
const options = Object.assign({}, defaultOptions, userOptions);
/**
* Currently any new error, after recover, continues the series of increasing
* delays, like 2 consequent errors would do. This is a bug of RxJS. To workaround
* the issue we use the difference with the counter value at the last recover.
* @see https://github.com/ReactiveX/rxjs/issues/1413
*/
let allErrorsCount = 0;
let lastRecoverCount = 0;
return fromEvent(document, 'visibilitychange').pipe(
startWith(null),
switchMap(() => {
if (isPageActive() || options.backgroundPolling) {
const firstRequest$ = request$;
const polling$ = interval(options.interval).pipe(
take(1),
switchMap(() => request$),
repeat()
);
return concat(firstRequest$, polling$).pipe(
retryWhen(errors$ => {
return errors$.pipe(
scan(
({ errorCount, error }, err) => {
return { errorCount: errorCount + 1, error: err };
},
{ errorCount: 0, error: null }
),
switchMap(({ errorCount, error }) => {
allErrorsCount = errorCount;
const consecutiveErrorsCount = allErrorsCount - lastRecoverCount;
// If already tempted too many times don't retry
if (consecutiveErrorsCount > options.attempts) throw error;
const delay = getStrategyDelay(consecutiveErrorsCount, options);
return timer(delay, null);
})
);
})
);
}
return empty();
}),
tap<T>(() => {
// Update the counter after every successful polling
lastRecoverCount = allErrorsCount;
})
);
}
function isPageActive(): boolean {
return !Boolean(document.hidden);
}
function getStrategyDelay(consecutiveErrorsCount: number, options: IOptions): number {
switch (options.backoffStrategy) {
case 'exponential':
return Math.pow(2, consecutiveErrorsCount - 1) * options.exponentialUnit;
case 'random':
const [min, max] = options.randomRange;
const range = max - min;
return Math.floor(Math.random() * range) + min;
case 'consecutive':
return options.constantTime || options.interval;
default:
console.error(`${options.backoffStrategy} is not a backoff strategy supported by rx-polling`);
// Return a value anyway to avoid throwing
return options.constantTime || options.interval;
}
}