-
Notifications
You must be signed in to change notification settings - Fork 19
/
AmazonSNSReceiver.java
115 lines (90 loc) · 4.11 KB
/
AmazonSNSReceiver.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.util.Map;
import java.util.Scanner;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.sns.AmazonSNSClient;
import com.amazonaws.services.sns.model.ConfirmSubscriptionRequest;
import com.amazonaws.services.sns.model.CreateTopicRequest;
import com.amazonaws.services.sns.model.CreateTopicResult;
import com.amazonaws.services.sns.model.SubscribeRequest;
import org.codehaus.jackson.map.ObjectMapper;
import org.mortbay.jetty.Request;
import org.mortbay.jetty.Server;
import org.mortbay.jetty.handler.AbstractHandler;
// Example SNS Receiver
public class AmazonSNSReceiver {
// AWS credentials -- replace with your credentials
static String ACCESS_KEY = "<Your AWS Access Key>";
static String SECRET_KEY = "<Your AWS Secret Key>";
// Shared queue for notifications from HTTP server
static BlockingQueue<Map<String, String>> messageQueue = new LinkedBlockingQueue<Map<String, String>>();
// Receiver loop
public static void main(String... args) throws Exception {
// Create a client
AmazonSNSClient service = new AmazonSNSClient(new BasicAWSCredentials(ACCESS_KEY, SECRET_KEY));
// Create a topic
CreateTopicRequest createReq = new CreateTopicRequest()
.withName("MyTopic");
CreateTopicResult createRes = service.createTopic(createReq);
// Get an HTTP Port
int port = args.length == 1 ? Integer.parseInt(args[0]) : 8989;
// Create and start HTTP server
Server server = new Server(port);
server.setHandler(new AmazonSNSHandler());
server.start();
// Subscribe to topic
SubscribeRequest subscribeReq = new SubscribeRequest()
.withTopicArn(createRes.getTopicArn())
.withProtocol("http")
.withEndpoint("http://" + InetAddress.getLocalHost().getHostAddress() + ":" + port);
service.subscribe(subscribeReq);
for (;;) {
// Wait for a message from HTTP server
Map<String, String> messageMap = messageQueue.take();
// Look for a subscription confirmation Token
String token = messageMap.get("Token");
if (token != null) {
// Confirm subscription
ConfirmSubscriptionRequest confirmReq = new ConfirmSubscriptionRequest()
.withTopicArn(createRes.getTopicArn())
.withToken(token);
service.confirmSubscription(confirmReq);
continue;
}
// Check for a notification
String message = messageMap.get("Message");
if (message != null) {
System.out.println("Received message: " + message);
}
}
}
// HTTP handler
static class AmazonSNSHandler extends AbstractHandler {
// Handle HTTP request
public void handle(String target, HttpServletRequest request, HttpServletResponse response, int dispatch) throws IOException {
// Scan request into a string
Scanner scanner = new Scanner(request.getInputStream());
StringBuilder sb = new StringBuilder();
while (scanner.hasNextLine()) {
sb.append(scanner.nextLine());
}
// Build a message map from the JSON encoded message
InputStream bytes = new ByteArrayInputStream(sb.toString().getBytes());
Map<String, String> messageMap = new ObjectMapper().readValue(bytes, Map.class);
// Enqueue message map for receive loop
messageQueue.add(messageMap);
// Set HTTP response
response.setContentType("text/html");
response.setStatus(HttpServletResponse.SC_OK);
((Request) request).setHandled(true);
}
}
}