Skip to content

Commit

Permalink
fix(stream): remove UnwrapStream, it doesn't work
Browse files Browse the repository at this point in the history
  • Loading branch information
yume-chan committed Oct 12, 2024
1 parent d862077 commit 6a18fa2
Show file tree
Hide file tree
Showing 6 changed files with 13 additions and 37 deletions.
8 changes: 2 additions & 6 deletions libraries/adb-server-node-tcp/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import {
MaybeConsumable,
PushReadableStream,
tryClose,
WrapWritableStream,
WritableStream,
} from "@yume-chan/stream-extra";
import type { ValueOrPromise } from "@yume-chan/struct";

Expand Down Expand Up @@ -36,7 +34,7 @@ function nodeSocketToConnection(
tryClose(controller);
});
}),
writable: new WritableStream<Uint8Array>({
writable: new MaybeConsumable.WritableStream<Uint8Array>({
write: (chunk) => {
return new Promise<void>((resolve, reject) => {
socket.write(chunk, (err) => {
Expand Down Expand Up @@ -100,9 +98,7 @@ export class AdbServerNodeTcpConnector
await handler({
service: address!,
readable: connection.readable,
writable: new WrapWritableStream(
connection.writable,
).bePipedThroughFrom(new MaybeConsumable.UnwrapStream()),
writable: connection.writable,
get closed() {
return connection.closed;
},
Expand Down
4 changes: 2 additions & 2 deletions libraries/adb/src/commands/sync/push.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ async function pipeFileData(
file.pipeThrough(new DistributionStream(packetSize, true))
.pipeTo(
new MaybeConsumable.WritableStream({
write: async (chunk) => {
await adbSyncWriteRequest(
write(chunk) {
return adbSyncWriteRequest(
locked,
AdbSyncRequestId.Data,
chunk,
Expand Down
4 changes: 2 additions & 2 deletions libraries/adb/src/commands/sync/socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ export class AdbSyncSocketLocked implements AsyncExactReadable {
this.#combiner = new BufferCombiner(bufferSize);
}

async #write(buffer: Uint8Array) {
#write(buffer: Uint8Array) {
// `#combiner` will reuse the buffer, so we need to use the Consumable pattern
await Consumable.WritableStream.write(this.#writer, buffer);
return Consumable.WritableStream.write(this.#writer, buffer);
}

async flush() {
Expand Down
13 changes: 5 additions & 8 deletions libraries/adb/src/server/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@ import type {
AbortSignal,
ReadableWritablePair,
WritableStreamDefaultWriter,
MaybeConsumable,
} from "@yume-chan/stream-extra";
import {
BufferedReadableStream,
MaybeConsumable,
WrapWritableStream,
tryCancel,
tryClose,
} from "@yume-chan/stream-extra";
Expand Down Expand Up @@ -211,8 +210,8 @@ export class AdbServerClient {
readonly wireless = new AdbServerClient.WirelessCommands(this);
readonly mDns = new AdbServerClient.MDnsCommands(this);

constructor(connection: AdbServerClient.ServerConnector) {
this.connector = connection;
constructor(connector: AdbServerClient.ServerConnector) {
this.connector = connector;
}

async createConnection(
Expand Down Expand Up @@ -437,9 +436,7 @@ export class AdbServerClient {
transportId,
service,
readable: socket.readable,
writable: new WrapWritableStream(
socket.writable,
).bePipedThroughFrom(new MaybeConsumable.UnwrapStream()),
writable: socket.writable,
get closed() {
return socket.closed;
},
Expand Down Expand Up @@ -567,7 +564,7 @@ export namespace AdbServerClient {
}

export interface ServerConnection
extends ReadableWritablePair<Uint8Array, Uint8Array>,
extends ReadableWritablePair<Uint8Array, MaybeConsumable<Uint8Array>>,
Closeable {
get closed(): Promise<void>;
}
Expand Down
4 changes: 1 addition & 3 deletions libraries/adb/src/server/transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,7 @@ export class AdbServerTransport implements AdbTransport {

async connect(service: string): Promise<AdbSocket> {
return await this.#client.createDeviceConnection(
{
transportId: this.transportId,
},
{ transportId: this.transportId },
service,
);
}
Expand Down
17 changes: 1 addition & 16 deletions libraries/stream-extra/src/maybe-consumable-ns.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@ import type {
QueuingStrategy,
WritableStreamDefaultController,
} from "./stream.js";
import {
WritableStream as NativeWritableStream,
TransformStream,
} from "./stream.js";
import { WritableStream as NativeWritableStream } from "./stream.js";

export function getValue<T>(value: MaybeConsumable<T>): T {
return value instanceof Consumable ? value.value : value;
Expand All @@ -24,18 +21,6 @@ export function tryConsume<T, R>(
}
}

export class UnwrapStream<T> extends TransformStream<MaybeConsumable<T>, T> {
constructor() {
super({
transform(chunk, controller) {
tryConsume(chunk, (chunk) => {
controller.enqueue(chunk as T);
});
},
});
}
}

export interface WritableStreamSink<in T> {
start?(
controller: WritableStreamDefaultController,
Expand Down

0 comments on commit 6a18fa2

Please sign in to comment.