Skip to content

Commit

Permalink
- Applied patch: belaban#104
Browse files Browse the repository at this point in the history
- Made all private members protected
  • Loading branch information
belaban committed Sep 6, 2013
1 parent 261600b commit a478143
Showing 1 changed file with 69 additions and 73 deletions.
142 changes: 69 additions & 73 deletions src/org/jgroups/protocols/FD_SOCK.java
Original file line number Diff line number Diff line change
@@ -33,16 +33,16 @@
*/
@MBean(description="Failure detection protocol based on sockets connecting members")
public class FD_SOCK extends Protocol implements Runnable {
private static final int NORMAL_TERMINATION=9;
private static final int ABNORMAL_TERMINATION=-1;
protected static final int NORMAL_TERMINATION=9;
protected static final int ABNORMAL_TERMINATION=-1;

/* ----------------------------------------- Properties -------------------------------------------------- */

@LocalAddress
@Property(description="The NIC on which the ServerSocket should listen on. " +
"The following special values are also recognized: GLOBAL, SITE_LOCAL, LINK_LOCAL and NON_LOOPBACK",
systemProperty={Global.BIND_ADDR},writable=false)
InetAddress bind_addr=null;
protected InetAddress bind_addr=null;

@Property(description="Use \"external_addr\" if you have hosts on different networks, behind " +
"firewalls. On each firewall, set up a port forwarding rule (sometimes called \"virtual server\") to " +
@@ -53,79 +53,79 @@ public class FD_SOCK extends Protocol implements Runnable {

@Property(description="Used to map the internal port (bind_port) to an external port. Only used if > 0",
systemProperty=Global.EXTERNAL_PORT,writable=false)
protected int external_port=0;
protected int external_port=0;

@Property(name="bind_interface", converter=PropertyConverters.BindInterface.class,
description="The interface (NIC) which should be used by this transport", dependsUpon="bind_addr")
protected String bind_interface_str=null;
protected String bind_interface_str=null;

@Property(description="Timeout for getting socket cache from coordinator. Default is 1000 msec")
long get_cache_timeout=1000;
protected long get_cache_timeout=1000;

@Property(description="Interval for broadcasting suspect messages. Default is 5000 msec")
long suspect_msg_interval=5000;
protected long suspect_msg_interval=5000;

@Property(description="Number of attempts coordinator is solicited for socket cache until we give up. Default is 3")
int num_tries=3;
protected int num_tries=3;

@Property(description="Start port for server socket. Default value of 0 picks a random port")
int start_port=0;
protected int start_port=0;

@Property(description="Start port for client socket. Default value of 0 picks a random port")
int client_bind_port=0;
protected int client_bind_port=0;

@Property(description="Number of ports to probe for start_port and client_bind_port")
int port_range=50;
protected int port_range=50;

@Property(description="Whether to use KEEP_ALIVE on the ping socket or not. Default is true")
private boolean keep_alive=true;
protected boolean keep_alive=true;

@Property(description="Max time in millis to wait for ping Socket.connect() to return")
private int sock_conn_timeout=1000;
protected int sock_conn_timeout=1000;


/* --------------------------------------------- JMX ------------------------------------------------------ */


private int num_suspect_events=0;
protected int num_suspect_events=0;

private final BoundedList<Address> suspect_history=new BoundedList<Address>(20);
protected final BoundedList<Address> suspect_history=new BoundedList<Address>(20);


/* --------------------------------------------- Fields ------------------------------------------------------ */


private volatile List<Address> members=new ArrayList<Address>(11); // volatile eliminates the lock
protected volatile List<Address> members=new ArrayList<Address>(11); // volatile eliminates the lock

protected final Set<Address> suspected_mbrs=new CopyOnWriteArraySet<Address>();
protected final Set<Address> suspected_mbrs=new CopyOnWriteArraySet<Address>();

private final List<Address> pingable_mbrs=new CopyOnWriteArrayList<Address>();
protected final List<Address> pingable_mbrs=new CopyOnWriteArrayList<Address>();

volatile boolean srv_sock_sent=false; // has own socket been broadcast yet ?
protected volatile boolean srv_sock_sent=false; // has own socket been broadcast yet ?
/** Used to rendezvous on GET_CACHE and GET_CACHE_RSP */
private final Promise<Map<Address,IpAddress>> get_cache_promise=new Promise<Map<Address,IpAddress>>();
private volatile boolean got_cache_from_coord=false; // was cache already fetched ?
private Address local_addr=null; // our own address
private ServerSocket srv_sock=null; // server socket to which another member connects to monitor me

private ServerSocketHandler srv_sock_handler=null; // accepts new connections on srv_sock
private IpAddress srv_sock_addr=null; // pair of server_socket:port
private Address ping_dest=null; // address of the member we monitor
private Socket ping_sock=null; // socket to the member we monitor
private InputStream ping_input=null; // input stream of the socket to the member we monitor
protected final Promise<Map<Address,IpAddress>> get_cache_promise=new Promise<Map<Address,IpAddress>>();
protected volatile boolean got_cache_from_coord=false; // was cache already fetched ?
protected Address local_addr=null; // our own address
protected ServerSocket srv_sock=null; // server socket to which another member connects to monitor me

protected ServerSocketHandler srv_sock_handler=null; // accepts new connections on srv_sock
protected IpAddress srv_sock_addr=null; // pair of server_socket:port
protected Address ping_dest=null; // address of the member we monitor
protected Socket ping_sock=null; // socket to the member we monitor
protected InputStream ping_input=null; // input stream of the socket to the member we monitor
@GuardedBy("this")
private volatile Thread pinger_thread=null; // listens on ping_sock, suspects member if socket is closed
protected volatile Thread pinger_thread=null; // listens on ping_sock, suspects member if socket is closed

/** Cache of member addresses and their ServerSocket addresses */
private final ConcurrentMap<Address,IpAddress> cache=Util.createConcurrentMap(11);
protected final ConcurrentMap<Address,IpAddress> cache=Util.createConcurrentMap(11);

private final Promise<IpAddress> ping_addr_promise=new Promise<IpAddress>(); // to fetch the ping_addr for ping_dest
private final Object sock_mutex=new Object(); // for access to ping_sock, ping_input
private TimeScheduler timer=null;
private final BroadcastTask bcast_task=new BroadcastTask(); // to transmit SUSPECT message (until view change)
private volatile boolean regular_sock_close=false; // used by interruptPingerThread() when new ping_dest is computed
protected final Promise<IpAddress> ping_addr_promise=new Promise<IpAddress>(); // to fetch the ping_addr for ping_dest
protected final Object sock_mutex=new Object(); // for access to ping_sock, ping_input
protected TimeScheduler timer=null;
protected final BroadcastTask bcast_task=new BroadcastTask(); // to transmit SUSPECT message (until view change)
protected volatile boolean regular_sock_close=false; // used by interruptPingerThread() when new ping_dest is computed

private boolean log_suspected_msgs=true;
protected boolean log_suspected_msgs=true;


public FD_SOCK() {
@@ -172,19 +172,16 @@ public String printCache() {

@ManagedOperation(description="Starts node crash monitor if member count > 1 and monitor is not running")
public boolean startNodeCrashMonitor() {
boolean started = false;
if( members.size() > 1 ) {
if( startPingerThread() ) {
if(members.size() > 1) {
if(startPingerThread()) {
log.warn("Node crash detection manually started, was not running for some reason.");
started = true;
return true;
}
else
log.warn("Node crash detection is already running.");
}
else {
log.info("Single node cluster, no need for node crash detection.");
log.debug("Node crash detection is already running.");
}
return started;
else
log.debug("Single node cluster, no need for node crash detection.");
return false;
}

public void init() throws Exception {
@@ -453,7 +450,7 @@ public void run() {
if(log.isTraceEnabled()) log.trace("pinger thread terminated");
}

private synchronized boolean isPingerThreadRunning(){
protected synchronized boolean isPingerThreadRunning(){
return pinger_thread != null && pinger_thread.isAlive() && !pinger_thread.isInterrupted();
}

@@ -463,7 +460,7 @@ private synchronized boolean isPingerThreadRunning(){
/* ----------------------------------- Private Methods -------------------------------------- */


void suspect(Set<Address> suspects) {
protected void suspect(Set<Address> suspects) {
if(suspects == null)
return;

@@ -489,7 +486,7 @@ void suspect(Set<Address> suspects) {
}


void handleSocketClose(Exception ex) {
protected void handleSocketClose(Exception ex) {
teardownPingSocket(); // make sure we have no leftovers
if(!regular_sock_close) { // only suspect if socket was not closed regularly (by interruptPingerThread())
if(log.isDebugEnabled())
@@ -507,16 +504,15 @@ void handleSocketClose(Exception ex) {
/**
* Does *not* need to be synchronized on pinger_mutex because the caller (down()) already has the mutex acquired
*/
private synchronized boolean startPingerThread() {
boolean started = false;
protected synchronized boolean startPingerThread() {
if(!isPingerThreadRunning()) {
ThreadFactory factory=getThreadFactory();
pinger_thread=factory.newThread(this, "FD_SOCK pinger");
pinger_thread.setDaemon(true);
pinger_thread.start();
started = true;
return true;
}
return started;
return false;
}

/**
@@ -527,15 +523,15 @@ private synchronized boolean startPingerThread() {
* code portable and we don't have to check for OSs.<p/>
* Does *not* need to be synchronized on pinger_mutex because the caller (down()) already has the mutex acquired
*/
private void interruptPingerThread() {
protected void interruptPingerThread() {
if(isPingerThreadRunning()) {
regular_sock_close=true;
// sendPingInterrupt(); // PATCH by Bruce Schuchardt (http://jira.jboss.com/jira/browse/JGRP-246)
teardownPingSocket(); // will wake up the pinger thread. less elegant than Thread.interrupt(), but does the job
}
}

private synchronized void stopPingerThread() {
protected synchronized void stopPingerThread() {
if(pinger_thread != null) {
regular_sock_close=true;
try {
@@ -556,12 +552,12 @@ private synchronized void stopPingerThread() {
}

// PATCH: send something so the connection handler can exit
void sendPingTermination() {
protected void sendPingTermination() {
sendPingSignal(NORMAL_TERMINATION);
}


void sendPingSignal(int signal) {
protected void sendPingSignal(int signal) {
synchronized(sock_mutex) {
if(ping_sock != null) {
try {
@@ -584,7 +580,7 @@ void sendPingSignal(int signal) {



void startServerSocket() throws Exception {
protected void startServerSocket() throws Exception {
srv_sock=Util.createServerSocket(getSocketFactory(),
"jgroups.fd_sock.srv_sock", bind_addr, start_port, start_port+port_range); // grab a random unused port above 10000
srv_sock_addr=new IpAddress(external_addr != null? external_addr : bind_addr, external_port > 0? external_port : srv_sock.getLocalPort());
@@ -602,7 +598,7 @@ public void stopServerSocket(boolean graceful) {
/**
* Creates a socket to <code>dest</code>, and assigns it to ping_sock. Also assigns ping_input
*/
boolean setupPingSocket(IpAddress dest) {
protected boolean setupPingSocket(IpAddress dest) {
synchronized(sock_mutex) {
if(dest == null) {
return false;
@@ -641,7 +637,7 @@ boolean setupPingSocket(IpAddress dest) {
}


void teardownPingSocket() {
protected void teardownPingSocket() {
synchronized(sock_mutex) {
if(ping_sock != null) {
try {
@@ -662,7 +658,7 @@ void teardownPingSocket() {
* Determines coordinator C. If C is null and we are the first member, return. Else loop: send GET_CACHE message
* to coordinator and wait for GET_CACHE_RSP response. Loop until valid response has been received.
*/
void getCacheFromCoordinator() {
protected void getCacheFromCoordinator() {
Address coord;
int attempts=num_tries;
Message msg;
@@ -700,7 +696,7 @@ void getCacheFromCoordinator() {
* that - at one point - either the coordinator or another participant taking over for a crashed coordinator, will
* react to the SUSPECT message and issue a new view, at which point the broadcast task stops.
*/
void broadcastSuspectMessage(Address suspected_mbr) {
protected void broadcastSuspectMessage(Address suspected_mbr) {
Message suspect_msg;
FdHeader hdr;

@@ -731,7 +727,7 @@ void broadcastSuspectMessage(Address suspected_mbr) {
Sends or broadcasts a I_HAVE_SOCK response. If 'dst' is null, the reponse will be broadcast, otherwise
it will be unicast back to the requester
*/
void sendIHaveSockMessage(Address dst, Address mbr, IpAddress addr) {
protected void sendIHaveSockMessage(Address dst, Address mbr, IpAddress addr) {
Message msg=new Message(dst).setFlag(Message.Flag.INTERNAL);
FdHeader hdr=new FdHeader(FdHeader.I_HAVE_SOCK);
hdr.mbr=mbr;
@@ -745,7 +741,7 @@ void sendIHaveSockMessage(Address dst, Address mbr, IpAddress addr) {
Attempts to obtain the ping_addr first from the cache, then by unicasting q request to <code>mbr</code>,
then by multicasting a request to all members.
*/
private IpAddress fetchPingAddress(Address mbr) {
protected IpAddress fetchPingAddress(Address mbr) {
IpAddress ret;
Message ping_addr_req;
FdHeader hdr;
@@ -784,7 +780,7 @@ private IpAddress fetchPingAddress(Address mbr) {
}


private Address determinePingDest() {
protected Address determinePingDest() {
Address first_mbr = null;
boolean several_mbrs = false;
boolean found_local_addr = false;
@@ -822,7 +818,7 @@ protected Address determineCoordinator() {
}


static String signalToString(int signal) {
protected static String signalToString(int signal) {
switch(signal) {
case NORMAL_TERMINATION: return "NORMAL_TERMINATION";
case ABNORMAL_TERMINATION: return "ABNORMAL_TERMINATION";
@@ -1008,7 +1004,7 @@ public void readFrom(DataInput in) throws Exception {
* to create a connection will be blocked until the first client closes its connection. This should not be a problem
* as the ring nature of the FD_SOCK protocol always has only 1 client connect to its right-hand-side neighbor.
*/
private class ServerSocketHandler implements Runnable {
protected class ServerSocketHandler implements Runnable {
Thread acceptor=null;
/** List<ClientConnectionHandler> */
final List<ClientConnectionHandler> clients=new LinkedList<ClientConnectionHandler>();
@@ -1082,7 +1078,7 @@ public void run() {


/** Handles a client connection; multiple client can connect at the same time */
private static class ClientConnectionHandler implements Runnable {
protected static class ClientConnectionHandler implements Runnable {
Socket client_sock=null;
InputStream in;
final Object mutex=new Object();
@@ -1110,7 +1106,7 @@ void stopThread(boolean graceful) {
}
}

private void closeClientSocket() {
protected void closeClientSocket() {
synchronized(mutex) {
Util.close(client_sock);
client_sock=null;
@@ -1150,7 +1146,7 @@ public void run() {
* sure they are retransmitted until a view has been received which doesn't contain the suspected members
* any longer. Then the task terminates.
*/
private class BroadcastTask implements Runnable {
protected class BroadcastTask implements Runnable {
final Set<Address> suspects=new HashSet<Address>();
Future<?> future;

@@ -1185,7 +1181,7 @@ public void removeAll() {
}


private void startTask() {
protected void startTask() {
if(future == null || future.isDone()) {
try {
future=timer.scheduleWithFixedDelay(this, suspect_msg_interval, suspect_msg_interval, TimeUnit.MILLISECONDS);
@@ -1197,7 +1193,7 @@ private void startTask() {
}
}

private void stopTask() {
protected void stopTask() {
if(future != null) {
future.cancel(false);
future=null;

0 comments on commit a478143

Please sign in to comment.