From 6a18fa293b3c7d8c89558af39dd332824c7af12e Mon Sep 17 00:00:00 2001 From: Simon Chan <1330321+yume-chan@users.noreply.github.com> Date: Sun, 13 Oct 2024 01:54:22 +0800 Subject: [PATCH] fix(stream): remove `UnwrapStream`, it doesn't work --- libraries/adb-server-node-tcp/src/index.ts | 8 ++------ libraries/adb/src/commands/sync/push.ts | 4 ++-- libraries/adb/src/commands/sync/socket.ts | 4 ++-- libraries/adb/src/server/client.ts | 13 +++++-------- libraries/adb/src/server/transport.ts | 4 +--- .../stream-extra/src/maybe-consumable-ns.ts | 17 +---------------- 6 files changed, 13 insertions(+), 37 deletions(-) diff --git a/libraries/adb-server-node-tcp/src/index.ts b/libraries/adb-server-node-tcp/src/index.ts index 2e3dfaee4..7df1b1217 100644 --- a/libraries/adb-server-node-tcp/src/index.ts +++ b/libraries/adb-server-node-tcp/src/index.ts @@ -6,8 +6,6 @@ import { MaybeConsumable, PushReadableStream, tryClose, - WrapWritableStream, - WritableStream, } from "@yume-chan/stream-extra"; import type { ValueOrPromise } from "@yume-chan/struct"; @@ -36,7 +34,7 @@ function nodeSocketToConnection( tryClose(controller); }); }), - writable: new WritableStream({ + writable: new MaybeConsumable.WritableStream({ write: (chunk) => { return new Promise((resolve, reject) => { socket.write(chunk, (err) => { @@ -100,9 +98,7 @@ export class AdbServerNodeTcpConnector await handler({ service: address!, readable: connection.readable, - writable: new WrapWritableStream( - connection.writable, - ).bePipedThroughFrom(new MaybeConsumable.UnwrapStream()), + writable: connection.writable, get closed() { return connection.closed; }, diff --git a/libraries/adb/src/commands/sync/push.ts b/libraries/adb/src/commands/sync/push.ts index 7dd32807a..206f2e045 100644 --- a/libraries/adb/src/commands/sync/push.ts +++ b/libraries/adb/src/commands/sync/push.ts @@ -41,8 +41,8 @@ async function pipeFileData( file.pipeThrough(new DistributionStream(packetSize, true)) .pipeTo( new MaybeConsumable.WritableStream({ - write: async (chunk) => { - await adbSyncWriteRequest( + write(chunk) { + return adbSyncWriteRequest( locked, AdbSyncRequestId.Data, chunk, diff --git a/libraries/adb/src/commands/sync/socket.ts b/libraries/adb/src/commands/sync/socket.ts index 3ef72429b..0ca3437db 100644 --- a/libraries/adb/src/commands/sync/socket.ts +++ b/libraries/adb/src/commands/sync/socket.ts @@ -35,9 +35,9 @@ export class AdbSyncSocketLocked implements AsyncExactReadable { this.#combiner = new BufferCombiner(bufferSize); } - async #write(buffer: Uint8Array) { + #write(buffer: Uint8Array) { // `#combiner` will reuse the buffer, so we need to use the Consumable pattern - await Consumable.WritableStream.write(this.#writer, buffer); + return Consumable.WritableStream.write(this.#writer, buffer); } async flush() { diff --git a/libraries/adb/src/server/client.ts b/libraries/adb/src/server/client.ts index d2519bd51..2e454c6ce 100644 --- a/libraries/adb/src/server/client.ts +++ b/libraries/adb/src/server/client.ts @@ -6,11 +6,10 @@ import type { AbortSignal, ReadableWritablePair, WritableStreamDefaultWriter, + MaybeConsumable, } from "@yume-chan/stream-extra"; import { BufferedReadableStream, - MaybeConsumable, - WrapWritableStream, tryCancel, tryClose, } from "@yume-chan/stream-extra"; @@ -211,8 +210,8 @@ export class AdbServerClient { readonly wireless = new AdbServerClient.WirelessCommands(this); readonly mDns = new AdbServerClient.MDnsCommands(this); - constructor(connection: AdbServerClient.ServerConnector) { - this.connector = connection; + constructor(connector: AdbServerClient.ServerConnector) { + this.connector = connector; } async createConnection( @@ -437,9 +436,7 @@ export class AdbServerClient { transportId, service, readable: socket.readable, - writable: new WrapWritableStream( - socket.writable, - ).bePipedThroughFrom(new MaybeConsumable.UnwrapStream()), + writable: socket.writable, get closed() { return socket.closed; }, @@ -567,7 +564,7 @@ export namespace AdbServerClient { } export interface ServerConnection - extends ReadableWritablePair, + extends ReadableWritablePair>, Closeable { get closed(): Promise; } diff --git a/libraries/adb/src/server/transport.ts b/libraries/adb/src/server/transport.ts index 7f49210a4..3eceb9374 100644 --- a/libraries/adb/src/server/transport.ts +++ b/libraries/adb/src/server/transport.ts @@ -76,9 +76,7 @@ export class AdbServerTransport implements AdbTransport { async connect(service: string): Promise { return await this.#client.createDeviceConnection( - { - transportId: this.transportId, - }, + { transportId: this.transportId }, service, ); } diff --git a/libraries/stream-extra/src/maybe-consumable-ns.ts b/libraries/stream-extra/src/maybe-consumable-ns.ts index bc4de1eef..fd53a56a4 100644 --- a/libraries/stream-extra/src/maybe-consumable-ns.ts +++ b/libraries/stream-extra/src/maybe-consumable-ns.ts @@ -4,10 +4,7 @@ import type { QueuingStrategy, WritableStreamDefaultController, } from "./stream.js"; -import { - WritableStream as NativeWritableStream, - TransformStream, -} from "./stream.js"; +import { WritableStream as NativeWritableStream } from "./stream.js"; export function getValue(value: MaybeConsumable): T { return value instanceof Consumable ? value.value : value; @@ -24,18 +21,6 @@ export function tryConsume( } } -export class UnwrapStream extends TransformStream, T> { - constructor() { - super({ - transform(chunk, controller) { - tryConsume(chunk, (chunk) => { - controller.enqueue(chunk as T); - }); - }, - }); - } -} - export interface WritableStreamSink { start?( controller: WritableStreamDefaultController,