Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Detect and recycle broken connections on non-GCM devices #8230

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,10 @@ synchronized SignalServiceMessageSender provideSignalMessageSender() {
new SignalProtocolStoreImpl(context),
BuildConfig.USER_AGENT,
TextSecurePreferences.isMultiDevice(context),
Optional.fromNullable(IncomingMessageObserver.getPipe()),
Optional.fromNullable(IncomingMessageObserver.getUnidentifiedPipe()),
IncomingMessageObserver.getPipeReference(),
IncomingMessageObserver.getUnidentifiedPipeReference(),
Optional.of(new SecurityEventListener(context)));
} else {
this.messageSender.setMessagePipe(IncomingMessageObserver.getPipe(), IncomingMessageObserver.getUnidentifiedPipe());
this.messageSender.setIsMultiDevice(TextSecurePreferences.isMultiDevice(context));
}

Expand Down
125 changes: 117 additions & 8 deletions src/org/thoughtcrime/securesms/service/IncomingMessageObserver.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
package org.thoughtcrime.securesms.service;

import android.app.Service;
import android.content.BroadcastReceiver;
import android.arch.lifecycle.DefaultLifecycleObserver;
import android.arch.lifecycle.LifecycleOwner;
import android.arch.lifecycle.ProcessLifecycleOwner;
import android.content.Context;
import android.content.Intent;
import android.content.IntentFilter;
import android.net.ConnectivityManager;
import android.net.Network;
import android.net.NetworkRequest;
import android.net.NetworkInfo;
import android.os.Build;
import android.os.IBinder;
import android.support.annotation.NonNull;
import android.support.annotation.Nullable;
Expand All @@ -28,6 +35,8 @@
import org.whispersystems.signalservice.api.SignalServiceMessagePipe;
import org.whispersystems.signalservice.api.SignalServiceMessageReceiver;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

Expand All @@ -46,6 +55,14 @@ public class IncomingMessageObserver implements InjectableType, ConstraintObserv
private final Context context;
private final NetworkConstraint networkConstraint;

public static AtomicReference<SignalServiceMessagePipe> pipeReference = new AtomicReference<>();
public static AtomicReference<SignalServiceMessagePipe> unidentifiedPipeReference = new AtomicReference<>();

private final MessageRetrievalThread retrievalThread;

private BroadcastReceiver connectivityChangeReceiver;
private ConnectivityManager.NetworkCallback connectivityChangeCallback;

private boolean appVisible;

@Inject SignalServiceMessageReceiver receiver;
Expand All @@ -58,10 +75,13 @@ public IncomingMessageObserver(@NonNull Context context) {
this.networkConstraint = new NetworkConstraint.Factory(ApplicationContext.getInstance(context)).create();

new NetworkConstraintObserver(ApplicationContext.getInstance(context)).register(this);
new MessageRetrievalThread().start();

retrievalThread = new MessageRetrievalThread();
retrievalThread.start();

if (TextSecurePreferences.isFcmDisabled(context)) {
ContextCompat.startForegroundService(context, new Intent(context, ForegroundService.class));
setupNetworkMonitoring();
}

ProcessLifecycleOwner.get().getLifecycle().addObserver(new DefaultLifecycleObserver() {
Expand Down Expand Up @@ -108,10 +128,12 @@ private synchronized boolean isConnectionNecessary() {
}

private synchronized void waitForConnectionNecessary() {
try {
while (!isConnectionNecessary()) wait();
} catch (InterruptedException e) {
throw new AssertionError(e);
while (!isConnectionNecessary()) {
try {
wait();
} catch (InterruptedException e) {
Log.d(TAG, "Retrieval thread interrupted while not connected; ignoring.");
}
}
}

Expand All @@ -132,6 +154,14 @@ private void shutdown(SignalServiceMessagePipe pipe, SignalServiceMessagePipe un
return unidentifiedPipe;
}

public static AtomicReference<SignalServiceMessagePipe> getPipeReference() {
return pipeReference;
}

public static AtomicReference<SignalServiceMessagePipe> getUnidentifiedPipeReference() {
return unidentifiedPipeReference;
}

private class MessageRetrievalThread extends Thread implements Thread.UncaughtExceptionHandler {

MessageRetrievalThread() {
Expand All @@ -149,11 +179,14 @@ public void run() {
pipe = receiver.createMessagePipe();
unidentifiedPipe = receiver.createUnidentifiedMessagePipe();

SignalServiceMessagePipe localPipe = pipe;
SignalServiceMessagePipe unidentifiedLocalPipe = unidentifiedPipe;
pipeReference.set(pipe);
unidentifiedPipeReference.set(unidentifiedPipe);

final SignalServiceMessagePipe localPipe = pipe;
final SignalServiceMessagePipe unidentifiedLocalPipe = unidentifiedPipe;

try {
while (isConnectionNecessary()) {
while (isConnectionNecessary() && !interrupted()) {
try {
Log.i(TAG, "Reading message...");
localPipe.read(REQUEST_TIMEOUT_MINUTES, TimeUnit.MINUTES,
Expand All @@ -167,6 +200,10 @@ public void run() {
Log.w(TAG, e);
}
}
} catch (InterruptedException e) {
Log.d(TAG, "Retrieval thread interrupted.");
} catch (IOException e) {
Log.d(TAG, "Message pipe failed: " + e.getMessage());
} catch (Throwable e) {
Log.w(TAG, e);
} finally {
Expand All @@ -185,6 +222,78 @@ public void uncaughtException(Thread t, Throwable e) {
}
}

private void setupNetworkMonitoring() {
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.M) {
final ConnectivityManager connectivityManager = (ConnectivityManager)context.getSystemService(Context.CONNECTIVITY_SERVICE);

connectivityChangeCallback = new ConnectivityManager.NetworkCallback() {
private Network current;

private void update(Network network) {
final Network previous = current;
current = network;

Log.d(TAG, "Currently active network: " + network);

if (previous != null && (current == null || !current.equals(previous))) {
Log.d(TAG,
"Active network changed (" + previous + " -> " + current +
"); interrupting the retrieval thread to recycle the pipe.");

retrievalThread.interrupt();
}
}

@Override
public void onAvailable(Network network) {
final ConnectivityManager connectivityManager = (ConnectivityManager)context.getSystemService(Context.CONNECTIVITY_SERVICE);

update(connectivityManager.getActiveNetwork());
}

@Override
public void onLost(Network network) {
final ConnectivityManager connectivityManager = (ConnectivityManager)context.getSystemService(Context.CONNECTIVITY_SERVICE);

update(connectivityManager.getActiveNetwork());
}
};

connectivityManager.registerNetworkCallback(new NetworkRequest.Builder().build(),
connectivityChangeCallback);
} else {
connectivityChangeReceiver = new BroadcastReceiver() {
private int current = -1;

@Override
public void onReceive(Context context, Intent intent) {
final ConnectivityManager connectivityManager = (ConnectivityManager)context.getSystemService(Context.CONNECTIVITY_SERVICE);

final NetworkInfo info = connectivityManager.getActiveNetworkInfo();
final int previous = current;

if (info == null) {
current = -1;
} else if (info.isConnected()) {
current = info.getType();
}

Log.d(TAG, "Currently active network: " + current);

if (previous != -1 && previous != current) {
Log.d(TAG,
"Active network changed (" + previous + " -> " + current +
"); interrupting the retrieval thread to recycle the pipe.");
retrievalThread.interrupt();
}
}
};

context.registerReceiver(connectivityChangeReceiver,
new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION));
}
}

public static class ForegroundService extends Service {

@Override
Expand Down