continue processing on error #7293
-
Ive been able to find quite a few examples of catchError and onErrorResumeNext as well as the docs, but nothing seems to work in my case so I must be misunderstanding. I think the purpose of this snippet is apparent, but the shtick is trying to continue after biting a parsing fail. const readNDJSONLines = (filePath: string): Observable<CaseRecord> =>
readLineObs(filePath).pipe(
filter((line) => line.trim().length > 0),
map((line) => {
try {
return JSON.parse(line) as CaseRecord;
} catch (err) {
console.error(`Failed to parse line: ${line}`);
throw err;
}
}),
catchError(() => of(null as unknown as CaseRecord)),
filter((record) => record !== null)
); A few things tried: catchError((err) => of(EMPTY))
onErrorResumeNext(EMPTY)
catchError(() => EMPTY)
onErrorResumeNext(readLineObs(filePath).pipe(...... What is the right way to continue (readLineObs()) after throwing an error upstream in the pipe? |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 1 reply
-
There's no way :) emitting an error closes all the subscriptions, by design. Any operator you add after can't change this behaviour, they can only react after the subscriptions are closed. How to work around that depends exactly on the use case. From your example, you could easily change it to just const readNDJSONLines = (filePath: string): Observable<CaseRecord> =>
readLineObs(filePath).pipe(
filter((line) => line.trim().length > 0),
map((line) => {
try {
return JSON.parse(line) as CaseRecord;
} catch (err) {
console.error(`Failed to parse line: ${line}`);
return null;
}
}),
filter((record) => record !== null)
); i.e. no need to throw an error if later all you do is map it to also another trick I sometimes do is to think where to put the catchError... this works well if you're using switchMap, etc. idsToRequest$.pipe(
concatMap(id => requestData$(id).pipe(
catchError(() => of(null)) // or EMPTY or whatever
)
) This way the observable that dies is the inner If I were to put the catchError outside, after the |
Beta Was this translation helpful? Give feedback.
There's no way :) emitting an error closes all the subscriptions, by design.
Any operator you add after can't change this behaviour, they can only react after the subscriptions are closed.
catchError
lets you return a new observable that should be used after the error has happened,onErrorResumeNextWith
lets you define a list of observables to be used as they keep erroring (like aconcatWith
but for errors),retry
(and variants) let you re-subscribe to the observable that has just errored (But usually will begin back from the start)How to work around that depends exactly on the use case. From your example, you could easily change it to just