-
Notifications
You must be signed in to change notification settings - Fork 0
/
rx.js
230 lines (213 loc) · 7.49 KB
/
rx.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
import {
BehaviorSubject,
Subscribable,
Subject,
Unsubscribable,
} from 'rxjs'
import { translated } from './languageHelper'
import PromisE from './PromisE'
import {
deferred,
hasValue,
isArr,
isFn,
isPositiveNumber,
isSubjectLike,
} from './utils'
const textsCap = {
timedout: 'request timed out before an expected value is received'
}
translated(textsCap, true)
export const IGNORE_UPDATE_SYMBOL = Symbol('ignore-rx-subject-update')
/**
* @name copyRxSubject
* @summary creates a new subject that automatically copies the value of the source subject.
*
* @description The the changes are applied unidirectionally from the source subject to the destination subject.
* Changes on the destination subject is NOT applied back into the source subject.
*
* @param {Subscribable|Array} rxSource RxJS source subject(s). If Array, value of `rxCopy` will also be Array.
* @param {Subscribable} rxCopy (optional) RxJS copy/destination subject
* Default: `new BehaviorSubject()`
* @param {Function} valueModifier (optional) callback to modify the value before copying from `rxSource`.
* Accepts async functions.
* Args: `newValue, previousValue, rxCopy`
*
* @returns {Subscribable} rxCopy
*/
export const copyRxSubject = (
rxSource,
rxCopy,
valueModifier,
defer,
) => {
const sourceIsArr = isArr(rxSource)
if (sourceIsArr) rxSource = rxSource.map(x => !isSubjectLike(x)
? new BehaviorSubject(x)
: x
)
const gotSource = sourceIsArr || isSubjectLike(rxSource)
const gotModifier = isFn(valueModifier)
const isValid = value => value !== IGNORE_UPDATE_SYMBOL
if (!isSubjectLike(rxCopy)) {
let initialValue = !gotSource
? undefined
: !sourceIsArr
? rxSource.value
: rxSource.map(x => x.value)
rxCopy = new BehaviorSubject()
if (gotModifier) initialValue = valueModifier(
initialValue,
undefined,
rxCopy
)
isValid(initialValue) && rxCopy.next(initialValue)
}
if (!gotSource) return rxCopy
const subscribeOrg = rxCopy.subscribe.bind(rxCopy)
rxCopy.subscribe = (...args) => {
let unsubscribed = false
let setValue = async value => {
if (unsubscribed) return
try {
value = !gotModifier
? value
: await valueModifier(
value,
rxCopy.value,
rxCopy
)
isValid(value) && rxCopy.next(value)
} catch (_) { } //ignore if valueModifier threw exception
}
if (defer > 0) setValue = deferred(setValue, defer)
const values = []
const subs = !sourceIsArr
? rxSource.subscribe(value => setValue(value))
: rxSource.map((x, i) =>
x?.subscribe(value => {
values[i] = value
setValue(values)
})
)
const sub = subscribeOrg(...args)
const unsubscribeOrg = sub.unsubscribe
sub.unsubscribe = (...args) => {
if (unsubscribed) return
unsubscribed = true
unsubscribeOrg.call(sub, ...args)
unsubscribe(subs)
}
return sub
}
return rxCopy
}
export const getRxInterval = (
initialValue = 0,
delay = 1000,
autoStart = true,
incrementBy = 1
) => {
let intervalId
let rxInterval = {}
rxInterval = new BehaviorSubject(parseInt(initialValue) || 0)
rxInterval.autoStart = autoStart
rxInterval.delay = delay
rxInterval.incrementBy = incrementBy
rxInterval.pause = () => clearInterval(intervalId)
rxInterval.start = () => {
intervalId = setInterval(
() => rxInterval.next(
rxInterval.value + rxInterval.incrementBy
),
delay,
)
}
rxInterval.stop = () => {
rxInterval.pause()
rxInterval.next(0)
}
autoStart && rxInterval.start()
return rxInterval
}
/**
* @name subjectAsPromise
* @summary sugar for RxJS subject as promise and, optionally, wait until an expected value is received
*
* @param {Subject} subject RxJS subject or similar subscribable
* @param {*|Function} expectedValue (optional) if undefined, will resolve as soon as any value is received.
* If function, it should return true or false to indicate whether the value should be resolved.
* @param {Number|Function} timeout (optional) will reject if no value received within given time
* @param {String} timeoutMsg (optional) error message to use when times out.
* Default: 'Request timed out before an expected value is received'
*
* @returns {[Promise, Function]} will reject with:
* - `null` if times out
* - `undefined` if @subject is not a valid RxJS subject like subscribable
*/
export const subjectAsPromise = (
subject,
expectedValue,
timeout,
timeoutMsg = textsCap.timedout,
timeoutMsg2
) => {
if (!isSubjectLike(subject)) return
if (isFn(timeoutMsg)) {
console.warn('utils/rx.js => subjectAsPromise: `modifier` deprecated! Use `promise.then()` instead.')
timeoutMsg = timeoutMsg2 || textsCap.timedout
}
let subscription, timeoutId, unsubscribed
const unsubscribe = () => setTimeout(() => {
!unsubscribed && subscription.unsubscribe()
unsubscribed = true
clearTimeout(timeoutId)
}, 50)
const promise = new PromisE((resolve, reject) => {
subscription = subject.subscribe(value => {
const shouldResolve = isFn(expectedValue) && expectedValue(value)
// no expected value set. resolve with first value received
|| expectedValue === undefined
// exact match
|| value === expectedValue
// resolve only when `subject` is NOT empty, null, NaN etc. Check `hasValue` for details.
|| (expectedValue === subjectAsPromise.anyValueSymbol && hasValue(value))
if (!shouldResolve) return
unsubscribe()
resolve(value)
})
timeoutId = isPositiveNumber(timeout) && setTimeout(() => {
// prevent rejecting if already unsubscribed
if (unsubscribed) return
unsubscribe()
reject(timeoutMsg)
}, timeout)
})
return [promise, unsubscribe]
}
subjectAsPromise.anyValueSymbol = Symbol('any-value')
/**
* @name unsubscribe
* @summary unsubscribe to multiple RxJS subscriptions
* @param {Function|Unsubscribable|Array} unsub
*/
export const unsubscribe = (unsub = {}) => {
// single function supplied
if (isFn(unsub)) return unsub()
// single
if (unsub && isFn(unsub.unsubscribe)) return unsub.unsubscribe()
// multi
Object
.values(unsub)
.forEach(x => {
try {
if (!x) return
const fn = isFn(x)
? x
: isFn(x.unsubscribe)
? x.unsubscribe
: null
fn && fn()
} catch (e) { } // ignore
})
}