forked from crumbjp/test
-
Notifications
You must be signed in to change notification settings - Fork 0
/
TestMain.java
136 lines (130 loc) · 3.73 KB
/
TestMain.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package test;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.service.IoAcceptor;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.transport.vmpipe.VmPipeAcceptor;
import org.apache.mina.transport.vmpipe.VmPipeAddress;
import org.apache.mina.transport.vmpipe.VmPipeConnector;
public class TestMain {
static class MsgBase {
public static enum TYPE {
UNDEF,
GET,
SET,
FINNISH,
}
public TYPE type = MsgBase.TYPE.UNDEF;
public IoSession session;
public MsgBase(TYPE t) {
this.type = t;
}
}
/**
* @param args
*/
public static void main(String[] args) throws Exception {
IoAcceptor acceptor = new VmPipeAcceptor();
VmPipeAddress address = new VmPipeAddress(8080);
// Worker thread
final BlockingQueue<MsgBase> workerQueue = new ArrayBlockingQueue<MsgBase>(10);
final Thread th = new Thread(){
public final BlockingQueue<MsgBase> q = workerQueue;
public void run() {
while (true) {
try {
MsgBase m = q.take();
switch (m.type) {
case GET:
Thread.sleep(1000);
m.session.write("GET finnished.");
break;
case SET:
Thread.sleep(1000);
m.session.write("SET finnished.");
break;
case FINNISH:
return;
default:
break;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
th.start();
// Server
acceptor.setHandler(new IoHandlerAdapter(){
private final int id = 0;
public void sessionOpened(IoSession session) {
System.out.println("Server-" + id + ": READY");
}
public void sessionClosed(IoSession session) {
System.out.println("Server-" + id + ": QUIT");
}
public void messageReceived(IoSession session, Object message) {
System.out.println("Server-" + id + ": RCVD " + message);
try {
MsgBase m = (MsgBase)message;
switch (m.type) {
case GET:
m.session = session;
workerQueue.put(m);
break;
case SET:
m.session = session;
workerQueue.put(m);
break;
default:
break;
}
}catch ( Throwable e){
;
}
}
public void messageSent(IoSession session, Object message) {
System.out.println("Server-" + id + ": SENT " + message);
}
public void exceptionCaught(IoSession session, Throwable cause) {
cause.printStackTrace();
session.close(true);
}
});
acceptor.bind(address);
// Client
VmPipeConnector connector = new VmPipeConnector();
connector.setHandler(new IoHandlerAdapter() {
public void sessionOpened(IoSession arg0) throws Exception {
}
public void sessionIdle(IoSession arg0, IdleStatus arg1) throws Exception {
}
public void sessionCreated(IoSession arg0) throws Exception {
}
public void sessionClosed(IoSession arg0) throws Exception {
}
public void messageSent(IoSession arg0, Object arg1) throws Exception {
}
public void messageReceived(IoSession arg0, Object message) throws Exception {
System.out.println("Client-" + 0 + ": RCV " + message);
}
@Override
public void exceptionCaught(IoSession session, Throwable cause) {
cause.printStackTrace();
session.close(true);
}
});
ConnectFuture future = connector.connect(address);
future.awaitUninterruptibly();
IoSession session = future.getSession();
session.write(new MsgBase(MsgBase.TYPE.GET));
session.write(new MsgBase(MsgBase.TYPE.SET));
session.getCloseFuture().awaitUninterruptibly();
workerQueue.put(new MsgBase(MsgBase.TYPE.FINNISH));
acceptor.unbind();
}
}