Skip to content

Commit

Permalink
Support less prominent streaming methods: client-streaming and bidire…
Browse files Browse the repository at this point in the history
…ctional streaming
  • Loading branch information
segfault16 committed Oct 13, 2023
1 parent 39b44ff commit 86f0caa
Showing 1 changed file with 89 additions and 46 deletions.
135 changes: 89 additions & 46 deletions packages/nice-grpc-client-middleware-devtools/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { isAbortError } from 'abort-controller-x';
import {isAbortError} from 'abort-controller-x';
import {
CallOptions,
ClientError,
Expand All @@ -10,7 +10,7 @@ import {
export type DevtoolsLoggingOptions = {
/**
* Skip logging abort errors.
*
*
* By default, abort errors are logged.
*/
skipAbortErrorLogging?: boolean;
Expand All @@ -19,15 +19,15 @@ export type DevtoolsLoggingOptions = {
export const devtoolsUnaryLoggingMiddleware: ClientMiddleware<DevtoolsLoggingOptions> =
async function* devtoolsLoggingMiddleware<Request, Response>(
call: ClientMiddlewareCall<Request, Response>,
options: CallOptions & Partial<DevtoolsLoggingOptions>
options: CallOptions & Partial<DevtoolsLoggingOptions>,
): AsyncGenerator<Response, Response | void, undefined> {
// skip streaming calls
if (call.requestStream || call.responseStream) {
return yield* call.next(call.request, options);
}

// log unary calls
const { path } = call.method;
const {path} = call.method;
const reqObj = getAsObject(call.request);

try {
Expand All @@ -41,7 +41,7 @@ export const devtoolsUnaryLoggingMiddleware: ClientMiddleware<DevtoolsLoggingOpt
response: resObj,
type: '__GRPCWEB_DEVTOOLS__',
},
'*'
'*',
);
return result;
} catch (error) {
Expand All @@ -59,7 +59,7 @@ export const devtoolsUnaryLoggingMiddleware: ClientMiddleware<DevtoolsLoggingOpt
request: reqObj,
type: '__GRPCWEB_DEVTOOLS__',
},
'*'
'*',
);
} else if (isAbortError(error) && error instanceof Error) {
if (!options.skipAbortErrorLogging) {
Expand All @@ -76,7 +76,7 @@ export const devtoolsUnaryLoggingMiddleware: ClientMiddleware<DevtoolsLoggingOpt
request: reqObj,
type: '__GRPCWEB_DEVTOOLS__',
},
'*'
'*',
);
}
} else if (error instanceof Error) {
Expand All @@ -93,7 +93,7 @@ export const devtoolsUnaryLoggingMiddleware: ClientMiddleware<DevtoolsLoggingOpt
request: reqObj,
type: '__GRPCWEB_DEVTOOLS__',
},
'*'
'*',
);
}

Expand All @@ -104,46 +104,42 @@ export const devtoolsUnaryLoggingMiddleware: ClientMiddleware<DevtoolsLoggingOpt
export const devtoolsStreamLoggingMiddleware: ClientMiddleware<DevtoolsLoggingOptions> =
async function* devtoolsLoggingMiddleware<Request, Response>(
call: ClientMiddlewareCall<Request, Response>,
options: CallOptions & Partial<DevtoolsLoggingOptions>
options: CallOptions & Partial<DevtoolsLoggingOptions>,
): AsyncGenerator<Response, Response | void, undefined> {
// skip unary calls
if (!call.responseStream && !call.requestStream) {
return yield* call.next(call.request, options);
}

// log streaming calls
const { path } = call.method;
const reqObj = getAsObject(call.request);
const {path} = call.method;

let first = true;
try {
for await (const response of call.next(call.request, options)) {
const resObj = getAsObject(response);
if (first) {
// log the request object only once
window.postMessage(
{
method: path,
methodType: 'server_streaming',
request: reqObj,
type: '__GRPCWEB_DEVTOOLS__',
},
'*'
);
first = false;
if (!call.requestStream) {
// server streaming, the most prominent streaming option in grpc-web
let first = true;
for await (const response of call.next(call.request, options)) {
if (first) {
// log the request object only once and after the first response to not have duplicate logs in case of an error
logStreamingRequestMessage(call.request, path);
first = false;
}
logStreamingResponseMessage(response, path);
yield response;
}
return;
} else {
const request = emitRequestMessages(call.request, path);
if (!call.responseStream) {
// client streaming
const response = yield* call.next(request, options);
logStreamingResponseMessage(response, path);
return response;
} else {
// bidirectional streaming
yield* emitResponseMessages(call.next(request, options), path);
return;
}
// log the response
window.postMessage(
{
method: path,
methodType: 'server_streaming',
response: resObj,
type: '__GRPCWEB_DEVTOOLS__',
},
'*'
);

yield response;
}
} catch (error) {
if (error instanceof ClientError) {
Expand All @@ -159,7 +155,7 @@ export const devtoolsStreamLoggingMiddleware: ClientMiddleware<DevtoolsLoggingOp
methodType: 'server_streaming',
type: '__GRPCWEB_DEVTOOLS__',
},
'*'
'*',
);
} else if (isAbortError(error) && error instanceof Error) {
if (!options.skipAbortErrorLogging) {
Expand All @@ -175,7 +171,7 @@ export const devtoolsStreamLoggingMiddleware: ClientMiddleware<DevtoolsLoggingOp
methodType: 'server_streaming',
type: '__GRPCWEB_DEVTOOLS__',
},
'*'
'*',
);
}
} else if (error instanceof Error) {
Expand All @@ -191,18 +187,19 @@ export const devtoolsStreamLoggingMiddleware: ClientMiddleware<DevtoolsLoggingOp
methodType: 'server_streaming',
type: '__GRPCWEB_DEVTOOLS__',
},
'*'
'*',
);
}

throw error;
}
};

export const devtoolsLoggingMiddleware: ClientMiddleware<DevtoolsLoggingOptions> = composeClientMiddleware(
devtoolsUnaryLoggingMiddleware,
devtoolsStreamLoggingMiddleware
);
export const devtoolsLoggingMiddleware: ClientMiddleware<DevtoolsLoggingOptions> =
composeClientMiddleware(
devtoolsUnaryLoggingMiddleware,
devtoolsStreamLoggingMiddleware,
);

// check whether the given object has toObject() method and return the object
// otherwise return the object itself
Expand All @@ -213,4 +210,50 @@ function getAsObject(obj: any) {
}
// ts-proto
return obj;
}
}

async function* emitRequestMessages<T>(
iterable: AsyncIterable<T>,
path: string,
): AsyncIterable<T> {
for await (const request of iterable) {
logStreamingRequestMessage(request, path);
yield request;
}
}

async function* emitResponseMessages<T>(
iterable: AsyncIterable<T>,
path: string,
): AsyncIterable<T> {
for await (const reponse of iterable) {
logStreamingResponseMessage(reponse, path);
yield reponse;
}
}

function logStreamingResponseMessage<T>(response: T, path: string) {
const resObj = getAsObject(response);
window.postMessage(
{
method: path,
methodType: 'server_streaming',
response: resObj,
type: '__GRPCWEB_DEVTOOLS__',
},
'*',
);
}

function logStreamingRequestMessage<T>(request: T, path: string) {
const reqObj = getAsObject(request);
window.postMessage(
{
method: path,
methodType: 'server_streaming',
request: reqObj,
type: '__GRPCWEB_DEVTOOLS__',
},
'*',
);
}

0 comments on commit 86f0caa

Please sign in to comment.