-
Notifications
You must be signed in to change notification settings - Fork 30
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Streaming #68
base: master
Are you sure you want to change the base?
Streaming #68
Changes from all commits
4812596
b8a8b63
5f5ae8e
2f4f5c5
4448c2c
db14848
ec03f71
62db8bc
985afa6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
package com.pkmnapps.nearby_connections; | ||
|
||
import android.app.Activity; | ||
import android.os.ParcelFileDescriptor; | ||
import android.util.Log; | ||
import android.os.Build.VERSION; | ||
import android.os.Build.VERSION_CODES; | ||
|
@@ -21,8 +22,10 @@ | |
import com.google.android.gms.nearby.connection.PayloadCallback; | ||
import com.google.android.gms.nearby.connection.PayloadTransferUpdate; | ||
import com.google.android.gms.nearby.connection.Strategy; | ||
import com.google.android.gms.tasks.OnCompleteListener; | ||
import com.google.android.gms.tasks.OnFailureListener; | ||
import com.google.android.gms.tasks.OnSuccessListener; | ||
import com.google.android.gms.tasks.Task; | ||
|
||
import java.io.File; | ||
import java.io.FileNotFoundException; | ||
|
@@ -38,6 +41,7 @@ | |
import io.flutter.embedding.engine.plugins.activity.ActivityPluginBinding; | ||
import io.flutter.plugin.common.MethodCall; | ||
import io.flutter.plugin.common.MethodChannel; | ||
import io.flutter.plugin.common.EventChannel; | ||
import io.flutter.plugin.common.MethodChannel.MethodCallHandler; | ||
import io.flutter.plugin.common.MethodChannel.Result; | ||
import io.flutter.plugin.common.PluginRegistry; | ||
|
@@ -50,12 +54,54 @@ public class NearbyConnectionsPlugin implements MethodCallHandler, FlutterPlugin | |
private Activity activity; | ||
private static final String SERVICE_ID = "com.pkmnapps.nearby_connections"; | ||
private static MethodChannel channel; | ||
private static EventChannel eventChannel; | ||
private static PluginRegistry.Registrar pluginRegistrar; | ||
|
||
private static NearbyConnectionsStreamHandler nearbyConnectionsStreamHandler; | ||
private static ParcelFileDescriptor[] senderPayloadPipe = null; | ||
private static OutputStream senderOutputStream = null; | ||
private static InputStream senderInputStream = null; | ||
private static Thread senderOutputStreamThread = null; | ||
private volatile byte[] bytes = null; | ||
private volatile boolean isPipeOpen = false; | ||
private NearbyConnectionsPlugin(Activity activity) { | ||
this.activity = activity; | ||
} | ||
public NearbyConnectionsPlugin(){} | ||
public NearbyConnectionsPlugin(){ | ||
} | ||
|
||
// Stream Handler for event channel for receiver side | ||
static class NearbyConnectionsStreamHandler implements EventChannel.StreamHandler { | ||
|
||
public InputStream inputReceiverStream; | ||
private EventChannel.EventSink eventSink; | ||
@Override | ||
public void onListen(Object arguments, EventChannel.EventSink events) { | ||
if (events == null) return; | ||
eventSink = events; | ||
} | ||
|
||
@Override | ||
public void onCancel(Object arguments) { | ||
} | ||
|
||
public void initializeInputReceiverStream(InputStream inputStream) { | ||
inputReceiverStream = inputStream; | ||
byte[] bytes; | ||
while (true) { | ||
try { | ||
if (inputReceiverStream.available() <= 0) break; | ||
bytes = new byte[inputReceiverStream.read()]; | ||
inputReceiverStream.read(bytes); | ||
if(eventSink != null) { | ||
eventSink.success(bytes); | ||
} | ||
} catch (IOException e) { | ||
e.printStackTrace(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should be replaced with |
||
} | ||
} | ||
} | ||
} | ||
|
||
|
||
/** | ||
* Legacy Plugin registration. | ||
|
@@ -65,6 +111,9 @@ public static void registerWith(Registrar registrar) { | |
pluginRegistrar = registrar; | ||
channel = new MethodChannel(registrar.messenger(), "nearby_connections"); | ||
channel.setMethodCallHandler(new NearbyConnectionsPlugin(registrar.activity())); | ||
eventChannel = new EventChannel(registrar.messenger(), "nearby_connections/stream"); | ||
nearbyConnectionsStreamHandler = new NearbyConnectionsStreamHandler(); | ||
eventChannel.setStreamHandler(nearbyConnectionsStreamHandler); | ||
} | ||
|
||
@Override | ||
|
@@ -260,6 +309,77 @@ public void onFailure(@NonNull Exception e) { | |
} | ||
break; | ||
} | ||
case "addToSenderStream": { | ||
final String endpointId = (String) call.argument("endpointId"); | ||
bytes = (byte[]) call.argument("bytes"); | ||
assert endpointId != null; | ||
assert bytes != null; | ||
if (!isPipeOpen && senderPayloadPipe == null) { | ||
try { | ||
senderPayloadPipe = ParcelFileDescriptor.createPipe(); | ||
// 0 is for reading | ||
// 1 is for writing | ||
senderInputStream = new ParcelFileDescriptor.AutoCloseInputStream(senderPayloadPipe[0]); | ||
Nearby.getConnectionsClient(activity).sendPayload(endpointId, Payload.fromStream(senderInputStream)).addOnSuccessListener(new OnSuccessListener<Void>() { | ||
@Override | ||
public void onSuccess(Void aVoid) { | ||
|
||
} | ||
}).addOnFailureListener(new OnFailureListener() { | ||
@Override | ||
public void onFailure(@NonNull Exception e) { | ||
Log.d("nearby_connections", "Stream Failed"); | ||
Log.d("nearby_connections", e.getMessage()); | ||
try { | ||
senderPayloadPipe[0].close(); | ||
senderPayloadPipe[1].close(); | ||
} catch (IOException ex) { | ||
ex.printStackTrace(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Replace all |
||
} finally { | ||
isPipeOpen = false; | ||
senderPayloadPipe = null; | ||
senderInputStream = null; | ||
senderOutputStream = null; | ||
} | ||
|
||
} | ||
}).addOnCompleteListener(new OnCompleteListener<Void>() { | ||
@Override | ||
public void onComplete(@NonNull Task<Void> task) { | ||
} | ||
}); | ||
senderOutputStream = new ParcelFileDescriptor.AutoCloseOutputStream(senderPayloadPipe[1]); | ||
isPipeOpen = true; | ||
} catch (IOException e) { | ||
Log.d("nearby_connections", "Error while creating pipe"); | ||
Log.d("nearby_connections", e.getMessage()); | ||
senderPayloadPipe = null; | ||
} | ||
} | ||
try { | ||
if (senderOutputStream != null && isPipeOpen && senderInputStream.available() > 0) { | ||
Log.d("nearby_connections", "Adding to stream in line 2"); | ||
senderOutputStream.write(bytes); | ||
senderOutputStream.flush(); | ||
} else if (senderInputStream.available() <= 0) { | ||
Log.d("nearby_connections", "Closing pipe"); | ||
senderPayloadPipe[0].close(); | ||
senderPayloadPipe[1].close(); | ||
isPipeOpen = false; | ||
senderPayloadPipe = null; | ||
senderInputStream = null; | ||
senderOutputStream = null; | ||
} | ||
} catch (IOException e) { | ||
isPipeOpen = false; | ||
senderInputStream = null; | ||
senderOutputStream = null; | ||
senderPayloadPipe = null; | ||
Log.d("nearby_connections", e.getMessage()); | ||
} | ||
result.success(true); | ||
break; | ||
} | ||
case "cancelPayload": { | ||
String payloadId = (String) call.argument("payloadId"); | ||
assert payloadId != null; | ||
|
@@ -386,6 +506,10 @@ public void onPayloadReceived(@NonNull String endpointId, @NonNull Payload paylo | |
// This is deprecated and only available on Android 10 and below. | ||
args.put("filePath", payload.asFile().asJavaFile().getAbsolutePath()); | ||
} | ||
} else if (payload.getType() == Payload.Type.STREAM) { | ||
InputStream inputStream = payload.asStream().asInputStream(); | ||
Log.d("nearby_connections", "Input Stream Received"); | ||
nearbyConnectionsStreamHandler.initializeInputReceiverStream(inputStream); | ||
} | ||
|
||
channel.invokeMethod("onPayloadReceived", args); | ||
|
@@ -446,6 +570,9 @@ private Strategy getStrategy(int strategy) { | |
public void onAttachedToEngine(@NonNull FlutterPluginBinding binding) { | ||
channel = new MethodChannel(binding.getBinaryMessenger(), "nearby_connections"); | ||
channel.setMethodCallHandler(this); | ||
eventChannel = new EventChannel(binding.getBinaryMessenger(), "nearby_connections/stream"); | ||
nearbyConnectionsStreamHandler = new NearbyConnectionsStreamHandler(); | ||
eventChannel.setStreamHandler(nearbyConnectionsStreamHandler); | ||
} | ||
|
||
@Override | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -340,6 +340,18 @@ class _MyBodyState extends State<Body> { | |
} | ||
}, | ||
), | ||
ElevatedButton( | ||
child: const Text("Send Stream Payload"), | ||
onPressed: () async { | ||
endpointMap.forEach((key, value) { | ||
String a = Random().nextInt(100).toString(); | ||
|
||
showSnackbar("Sending $a to ${value.endpointName}, id: $key"); | ||
Nearby() | ||
.sendStreamPayload(key, Uint8List.fromList(a.codeUnits)); | ||
}); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is not really a usecase for streams, nor it depicts how to send an endless stream of data. the linked example uses audio, but if we want to avoid that, we can instead ask the user to hold the button, and send random bytes of payload while the button is on hold. Basically an example of sending a stream of data without any known size There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @mannprerak2 Bro, have you tested the streams in Java code yourself? Because, the streaming only works for the first time we use Nearby.getConnectionsClient(activity).sendPayload(endpointId, Payload.fromStream(inputStream)). If I do the same for the second time, the streaming doesn't work. Here is the code which I've written for the logic. https://github.com/lakshmanprabhu49/nearby_connections/tree/Streaming_v2 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can push commits to this PR itself. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @mannprerak2 I've pushed the code here. Can you review? As far as I tested in my local, the stream was sent from sender to receiver. But not sure if this will work in all scenarios There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've added more comments There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you also change the example to send maybe a stream of numbers at a gap of 1 second. E.g - Stream<int>.periodic(const Duration(seconds: 1), (x) => x).take(5)
.listen((i) {
Nearby().sendStreamPayload(key, Uint8List.fromList(i.toString().codeUnits));
});
Also, the receiver side should show a short snackbar (of duration 0.9 seconds) for every received stream |
||
}, | ||
), | ||
ElevatedButton( | ||
child: const Text("Print file names."), | ||
onPressed: () async { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its possible the while loop ends up being faster than the stream speed and we would end up no longer listening to the receiver stream.
I think we should follow the android docs here : https://developers.google.com/nearby/connections/android/exchange-data#stream:~:text=SystemClock.elapsedRealtime()%20%2D%20lastRead)%20%3E%3D%20READ_STREAM_IN_BG_TIMEOUT
Basically, a configured timeout value (READ_STREAM_IN_BG_TIMEOUT) after which the stream could be considered dead.
We should also let the user change the timeout (maybe a method channel call to update the stream timeout)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also on timeout we should call
events.endOfStream()