Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming #68

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 4.1.0
- Added streaming support via sendStreamPayload function.

## 4.0.1
- Updated nearby connections (`play-services-nearby:18.4.0` -> `play-services-nearby:18.7.0`)

Expand Down
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,16 @@ Nearby().sendBytesPayload(endpointId, bytes_array);

// payloads are recieved by callback given to acceptConnection method.
```

### Sending Stream Payload

```dart
Nearby().sendStreamPayload(endpointId, bytes_array);

// payloads are recieved by callback given to acceptConnection method.
// Payload is received as bytes, and is available in payload.bytes attribute
```

### Sending File Payload
You need to send the File Payload and File Name seperately.

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.pkmnapps.nearby_connections;

import android.app.Activity;
import android.os.ParcelFileDescriptor;
import android.util.Log;
import android.os.Build.VERSION;
import android.os.Build.VERSION_CODES;
Expand All @@ -21,8 +22,10 @@
import com.google.android.gms.nearby.connection.PayloadCallback;
import com.google.android.gms.nearby.connection.PayloadTransferUpdate;
import com.google.android.gms.nearby.connection.Strategy;
import com.google.android.gms.tasks.OnCompleteListener;
import com.google.android.gms.tasks.OnFailureListener;
import com.google.android.gms.tasks.OnSuccessListener;
import com.google.android.gms.tasks.Task;

import java.io.File;
import java.io.FileNotFoundException;
Expand All @@ -38,6 +41,7 @@
import io.flutter.embedding.engine.plugins.activity.ActivityPluginBinding;
import io.flutter.plugin.common.MethodCall;
import io.flutter.plugin.common.MethodChannel;
import io.flutter.plugin.common.EventChannel;
import io.flutter.plugin.common.MethodChannel.MethodCallHandler;
import io.flutter.plugin.common.MethodChannel.Result;
import io.flutter.plugin.common.PluginRegistry;
Expand All @@ -50,12 +54,54 @@ public class NearbyConnectionsPlugin implements MethodCallHandler, FlutterPlugin
private Activity activity;
private static final String SERVICE_ID = "com.pkmnapps.nearby_connections";
private static MethodChannel channel;
private static EventChannel eventChannel;
private static PluginRegistry.Registrar pluginRegistrar;

private static NearbyConnectionsStreamHandler nearbyConnectionsStreamHandler;
private static ParcelFileDescriptor[] senderPayloadPipe = null;
private static OutputStream senderOutputStream = null;
private static InputStream senderInputStream = null;
private static Thread senderOutputStreamThread = null;
private volatile byte[] bytes = null;
private volatile boolean isPipeOpen = false;
private NearbyConnectionsPlugin(Activity activity) {
this.activity = activity;
}
public NearbyConnectionsPlugin(){}
public NearbyConnectionsPlugin(){
}

// Stream Handler for event channel for receiver side
static class NearbyConnectionsStreamHandler implements EventChannel.StreamHandler {

public InputStream inputReceiverStream;
private EventChannel.EventSink eventSink;
@Override
public void onListen(Object arguments, EventChannel.EventSink events) {
if (events == null) return;
eventSink = events;
}

@Override
public void onCancel(Object arguments) {
}

public void initializeInputReceiverStream(InputStream inputStream) {
inputReceiverStream = inputStream;
byte[] bytes;
while (true) {
try {
if (inputReceiverStream.available() <= 0) break;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its possible the while loop ends up being faster than the stream speed and we would end up no longer listening to the receiver stream.

I think we should follow the android docs here : https://developers.google.com/nearby/connections/android/exchange-data#stream:~:text=SystemClock.elapsedRealtime()%20%2D%20lastRead)%20%3E%3D%20READ_STREAM_IN_BG_TIMEOUT

Basically, a configured timeout value (READ_STREAM_IN_BG_TIMEOUT) after which the stream could be considered dead.

We should also let the user change the timeout (maybe a method channel call to update the stream timeout)

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also on timeout we should call events.endOfStream()

bytes = new byte[inputReceiverStream.read()];
inputReceiverStream.read(bytes);
if(eventSink != null) {
eventSink.success(bytes);
}
} catch (IOException e) {
e.printStackTrace();
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be replaced with eventSink.error("Stream Error", "IO Exception", null) and a Log.e(...)

}
}
}
}


/**
* Legacy Plugin registration.
Expand All @@ -65,6 +111,9 @@ public static void registerWith(Registrar registrar) {
pluginRegistrar = registrar;
channel = new MethodChannel(registrar.messenger(), "nearby_connections");
channel.setMethodCallHandler(new NearbyConnectionsPlugin(registrar.activity()));
eventChannel = new EventChannel(registrar.messenger(), "nearby_connections/stream");
nearbyConnectionsStreamHandler = new NearbyConnectionsStreamHandler();
eventChannel.setStreamHandler(nearbyConnectionsStreamHandler);
}

@Override
Expand Down Expand Up @@ -260,6 +309,77 @@ public void onFailure(@NonNull Exception e) {
}
break;
}
case "addToSenderStream": {
final String endpointId = (String) call.argument("endpointId");
bytes = (byte[]) call.argument("bytes");
assert endpointId != null;
assert bytes != null;
if (!isPipeOpen && senderPayloadPipe == null) {
try {
senderPayloadPipe = ParcelFileDescriptor.createPipe();
// 0 is for reading
// 1 is for writing
senderInputStream = new ParcelFileDescriptor.AutoCloseInputStream(senderPayloadPipe[0]);
Nearby.getConnectionsClient(activity).sendPayload(endpointId, Payload.fromStream(senderInputStream)).addOnSuccessListener(new OnSuccessListener<Void>() {
@Override
public void onSuccess(Void aVoid) {

}
}).addOnFailureListener(new OnFailureListener() {
@Override
public void onFailure(@NonNull Exception e) {
Log.d("nearby_connections", "Stream Failed");
Log.d("nearby_connections", e.getMessage());
try {
senderPayloadPipe[0].close();
senderPayloadPipe[1].close();
} catch (IOException ex) {
ex.printStackTrace();
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace all e.printStackTrace calls with Log.e("nearby_connections", "IO Exception at <place>", e)

} finally {
isPipeOpen = false;
senderPayloadPipe = null;
senderInputStream = null;
senderOutputStream = null;
}

}
}).addOnCompleteListener(new OnCompleteListener<Void>() {
@Override
public void onComplete(@NonNull Task<Void> task) {
}
});
senderOutputStream = new ParcelFileDescriptor.AutoCloseOutputStream(senderPayloadPipe[1]);
isPipeOpen = true;
} catch (IOException e) {
Log.d("nearby_connections", "Error while creating pipe");
Log.d("nearby_connections", e.getMessage());
senderPayloadPipe = null;
}
}
try {
if (senderOutputStream != null && isPipeOpen && senderInputStream.available() > 0) {
Log.d("nearby_connections", "Adding to stream in line 2");
senderOutputStream.write(bytes);
senderOutputStream.flush();
} else if (senderInputStream.available() <= 0) {
Log.d("nearby_connections", "Closing pipe");
senderPayloadPipe[0].close();
senderPayloadPipe[1].close();
isPipeOpen = false;
senderPayloadPipe = null;
senderInputStream = null;
senderOutputStream = null;
}
} catch (IOException e) {
isPipeOpen = false;
senderInputStream = null;
senderOutputStream = null;
senderPayloadPipe = null;
Log.d("nearby_connections", e.getMessage());
}
result.success(true);
break;
}
case "cancelPayload": {
String payloadId = (String) call.argument("payloadId");
assert payloadId != null;
Expand Down Expand Up @@ -386,6 +506,10 @@ public void onPayloadReceived(@NonNull String endpointId, @NonNull Payload paylo
// This is deprecated and only available on Android 10 and below.
args.put("filePath", payload.asFile().asJavaFile().getAbsolutePath());
}
} else if (payload.getType() == Payload.Type.STREAM) {
InputStream inputStream = payload.asStream().asInputStream();
Log.d("nearby_connections", "Input Stream Received");
nearbyConnectionsStreamHandler.initializeInputReceiverStream(inputStream);
}

channel.invokeMethod("onPayloadReceived", args);
Expand Down Expand Up @@ -446,6 +570,9 @@ private Strategy getStrategy(int strategy) {
public void onAttachedToEngine(@NonNull FlutterPluginBinding binding) {
channel = new MethodChannel(binding.getBinaryMessenger(), "nearby_connections");
channel.setMethodCallHandler(this);
eventChannel = new EventChannel(binding.getBinaryMessenger(), "nearby_connections/stream");
nearbyConnectionsStreamHandler = new NearbyConnectionsStreamHandler();
eventChannel.setStreamHandler(nearbyConnectionsStreamHandler);
}

@Override
Expand Down
12 changes: 12 additions & 0 deletions example/lib/main.dart
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,18 @@ class _MyBodyState extends State<Body> {
}
},
),
ElevatedButton(
child: const Text("Send Stream Payload"),
onPressed: () async {
endpointMap.forEach((key, value) {
String a = Random().nextInt(100).toString();

showSnackbar("Sending $a to ${value.endpointName}, id: $key");
Nearby()
.sendStreamPayload(key, Uint8List.fromList(a.codeUnits));
});
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not really a usecase for streams, nor it depicts how to send an endless stream of data.

See https://github.com/googlearchive/android-nearby/blob/master/connections/walkietalkie/app/src/automatic/java/com/google/location/nearby/apps/walkietalkie/MainActivity.java

the linked example uses audio, but if we want to avoid that, we can instead ask the user to hold the button, and send random bytes of payload while the button is on hold. Basically an example of sending a stream of data without any known size

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mannprerak2 Bro, have you tested the streams in Java code yourself? Because, the streaming only works for the first time we use Nearby.getConnectionsClient(activity).sendPayload(endpointId, Payload.fromStream(inputStream)). If I do the same for the second time, the streaming doesn't work. Here is the code which I've written for the logic. https://github.com/lakshmanprabhu49/nearby_connections/tree/Streaming_v2

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can push commits to this PR itself.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mannprerak2 I've pushed the code here. Can you review? As far as I tested in my local, the stream was sent from sender to receiver. But not sure if this will work in all scenarios

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added more comments

Copy link
Owner

@mannprerak2 mannprerak2 Nov 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you also change the example to send maybe a stream of numbers at a gap of 1 second. E.g -

Stream<int>.periodic(const Duration(seconds: 1), (x) => x).take(5)
      .listen((i) {
        Nearby().sendStreamPayload(key, Uint8List.fromList(i.toString().codeUnits));
      });

Also, the receiver side should show a short snackbar (of duration 0.9 seconds) for every received stream

},
),
ElevatedButton(
child: const Text("Print file names."),
onPressed: () async {
Expand Down
32 changes: 20 additions & 12 deletions example/pubspec.lock
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ packages:
dependency: transitive
description:
name: collection
sha256: "4a07be6cb69c84d677a6c3096fcf960cc3285a8330b4603e0d463d15d9bd934c"
sha256: f092b211a4319e98e5ff58223576de6c2803db36221657b46c82574721240687
url: "https://pub.dev"
source: hosted
version: "1.17.1"
version: "1.17.2"
cross_file:
dependency: transitive
description:
Expand Down Expand Up @@ -172,18 +172,18 @@ packages:
dependency: transitive
description:
name: matcher
sha256: "6501fbd55da300384b768785b83e5ce66991266cec21af89ab9ae7f5ce1c4cbb"
sha256: "1803e76e6653768d64ed8ff2e1e67bea3ad4b923eb5c56a295c3e634bad5960e"
url: "https://pub.dev"
source: hosted
version: "0.12.15"
version: "0.12.16"
material_color_utilities:
dependency: transitive
description:
name: material_color_utilities
sha256: d92141dc6fe1dad30722f9aa826c7fbc896d021d792f80678280601aff8cf724
sha256: "9528f2f296073ff54cb9fee677df673ace1218163c3bc7628093e7eed5203d41"
url: "https://pub.dev"
source: hosted
version: "0.2.0"
version: "0.5.0"
meta:
dependency: transitive
description:
Expand All @@ -198,7 +198,7 @@ packages:
path: ".."
relative: true
source: path
version: "4.0.0"
version: "4.0.1"
path:
dependency: transitive
description:
Expand Down Expand Up @@ -328,10 +328,10 @@ packages:
dependency: transitive
description:
name: source_span
sha256: dd904f795d4b4f3b870833847c461801f6750a9fa8e61ea5ac53f9422b31f250
sha256: "53e943d4206a5e30df338fd4c6e7a077e02254531b138a15aec3bd143c1a8b3c"
url: "https://pub.dev"
source: hosted
version: "1.9.1"
version: "1.10.0"
stack_trace:
dependency: transitive
description:
Expand Down Expand Up @@ -368,10 +368,10 @@ packages:
dependency: transitive
description:
name: test_api
sha256: eb6ac1540b26de412b3403a163d919ba86f6a973fe6cc50ae3541b80092fdcfb
sha256: "75760ffd7786fffdfb9597c35c5b27eaeec82be8edfb6d71d32651128ed7aab8"
url: "https://pub.dev"
source: hosted
version: "0.5.1"
version: "0.6.0"
typed_data:
dependency: transitive
description:
Expand All @@ -388,6 +388,14 @@ packages:
url: "https://pub.dev"
source: hosted
version: "2.1.4"
web:
dependency: transitive
description:
name: web
sha256: dc8ccd225a2005c1be616fe02951e2e342092edf968cf0844220383757ef8f10
url: "https://pub.dev"
source: hosted
version: "0.1.4-beta"
win32:
dependency: transitive
description:
Expand All @@ -405,5 +413,5 @@ packages:
source: hosted
version: "1.0.0"
sdks:
dart: ">=3.0.0-0 <4.0.0"
dart: ">=3.1.0-185.0.dev <4.0.0"
flutter: ">=3.0.0"
19 changes: 18 additions & 1 deletion lib/src/nearby.dart
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,9 @@ class Nearby {
OnPayloadTransferUpdate? _onPayloadTransferUpdate;

static const MethodChannel _channel =
const MethodChannel('nearby_connections');
const MethodChannel('nearby_connections');
static const EventChannel _eventChannel =
const EventChannel('nearby_connections/stream');

/// convenience method
///
Expand Down Expand Up @@ -358,6 +360,21 @@ class Nearby {
);
}

/// Send stream payload
///
void sendStreamPayload(String endpointId, Stream<Uint8List> stream) async {
stream.listen((data) {
_channel.invokeListMethod('addToSenderStream', <String, dynamic>{
'endpointId': endpointId,
'bytes': data,
});
});
}

Stream<dynamic> getReceiverStream() {
return _eventChannel.receiveBroadcastStream();
}

mannprerak2 marked this conversation as resolved.
Show resolved Hide resolved
/// Use it to cancel/stop a payload transfer
Future<void> cancelPayload(int payloadId) async {
return await _channel.invokeMethod(
Expand Down
Loading
Loading