From b2ec2fa41ae9035167c30fcd67f2ad45c2c18274 Mon Sep 17 00:00:00 2001 From: "Ben.Aldrich" Date: Tue, 17 Jun 2014 17:08:51 -0600 Subject: [PATCH 1/5] initial version to upgrade to 1.1.x es --- pom.xml | 32 +- .../elasticsearch/zeromq/ZMQRestRequest.java | 317 ++++++++---------- .../elasticsearch/zeromq/ZMQRestResponse.java | 166 ++++----- .../org/elasticsearch/zeromq/ZMQServer.java | 8 +- .../zeromq/impl/ZMQQueueServerImpl.java | 8 +- 5 files changed, 262 insertions(+), 269 deletions(-) diff --git a/pom.xml b/pom.xml index 6ddbd11..8610a9f 100644 --- a/pom.xml +++ b/pom.xml @@ -5,12 +5,12 @@ org.elasticsearch.plugin transport-zeromq jar - 0.0.4-SNAPSHOT + 0.0.5-SNAPSHOT ØMQ transport layer plugin for Elasticsearch - https://github.com/tlrx/transport-zeromq - + https://github.com/itiu/transport-zeromq + - 0.19.2 + 1.1.2 @@ -18,7 +18,7 @@ scm:git:git@github.com:tlrx/transport-zeromq.git scm:git:git@github.com:tlrx/transport-zeromq.git - + GitHub https://github.com/tlrx/transport-zeromq/issues/ @@ -43,14 +43,23 @@ org.elasticsearch.zeromq jzmq - 1.0.0 + 2.2.2-SNAPSHOT system - ${project.basedir}/jzmq-1.0.0.jar + /usr/local/probe/share/java/zmq.jar + + org.apache.maven.plugins + maven-surefire-plugin + 2.10 + + once + -Djava.library.path=/usr/local/probe/lib:/usr/local/probe/share/java + + com.github.github downloads-maven-plugin @@ -133,8 +142,9 @@ maven-compiler-plugin 2.3.2 - 1.6 - 1.6 + 1.7 + 1.7 + -Djava.library.path=/usr/local/probe/lib:/usr/local/probe/share/java @@ -148,7 +158,7 @@ - + sonatype @@ -164,4 +174,4 @@ file:///tmp/local_tmp_repo/ - + \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/zeromq/ZMQRestRequest.java b/src/main/java/org/elasticsearch/zeromq/ZMQRestRequest.java index 1009642..03b81ab 100644 --- a/src/main/java/org/elasticsearch/zeromq/ZMQRestRequest.java +++ b/src/main/java/org/elasticsearch/zeromq/ZMQRestRequest.java @@ -1,181 +1,160 @@ package org.elasticsearch.zeromq; -import org.elasticsearch.common.Bytes; -import org.elasticsearch.common.Unicode; -import org.elasticsearch.rest.support.AbstractRestRequest; -import org.elasticsearch.rest.support.RestUtils; -import org.elasticsearch.zeromq.exception.NoURIFoundZMQException; -import org.elasticsearch.zeromq.exception.UnsupportedMethodZMQException; -import org.elasticsearch.zeromq.exception.ZMQTransportException; - import java.nio.ByteBuffer; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.support.RestUtils; +import org.elasticsearch.zeromq.exception.NoURIFoundZMQException; +import org.elasticsearch.zeromq.exception.UnsupportedMethodZMQException; +import org.elasticsearch.zeromq.exception.ZMQTransportException; /** * @author tlrx - * + * */ -public class ZMQRestRequest extends AbstractRestRequest { - - private final List parts; - - private Method method; - - private String uri; - - private String rawPath; - - private final Map params; - - public ByteBuffer body; - - public ZMQRestRequest(String payload, List parts) { - super(); - this.parts = parts; - this.params = new HashMap(); - - parse(payload); - } - - private void parse(String payload) { - - if (payload != null) { - - String[] s = payload.split("\\|"); - - if(s.length <2){ - throw new ZMQTransportException("Invalid message format"); - } - - // Method - String m = s[0]; - - if ("GET".equalsIgnoreCase(m)) { - this.method = Method.GET; - } else if ("POST".equalsIgnoreCase(m)) { - this.method = Method.POST; - } else if ("PUT".equalsIgnoreCase(m)) { - this.method = Method.PUT; - } else if ("DELETE".equalsIgnoreCase(m)) { - this.method = Method.DELETE; - } else if ("OPTIONS".equalsIgnoreCase(m)) { - this.method = Method.OPTIONS; - } else if ("HEAD".equalsIgnoreCase(m)) { - this.method = Method.HEAD; - } else { - throw new UnsupportedMethodZMQException(m); - } - - // URI - this.uri = s[1]; - - if((this.uri == null) || ("".equals(this.uri)) || "null".equalsIgnoreCase(this.uri)){ - throw new NoURIFoundZMQException(); - } - - int pathEndPos = uri.indexOf('?'); - if (pathEndPos < 0) { - this.rawPath = uri; - } else { - this.rawPath = uri.substring(0, pathEndPos); - RestUtils.decodeQueryString(uri, pathEndPos + 1, params); - } - - // Content - int indexContent = payload.indexOf(ZMQSocket.SEPARATOR, m.length() + uri.length()); - body = ByteBuffer.wrap(payload.substring(indexContent+1).getBytes()); - } - } - - @Override - public Method method() { - return this.method; - } - - @Override - public String uri() { - return this.uri; - } - - @Override - public String rawPath() { - return this.rawPath; - } - - @Override - public boolean hasContent() { - return ((body != null) && (body.remaining() > 0)); - } - - @Override - public boolean contentUnsafe() { - return false; - } - - @Override - public byte[] contentByteArray() { - if (body == null) { - return Bytes.EMPTY_ARRAY; - } - return body.array(); - } - - @Override - public int contentByteArrayOffset() { - if (body == null) { - return 0; - } - return body.arrayOffset() + body.position(); - } - - @Override - public int contentLength() { - if (body == null) { - return 0; - } - return body.remaining(); - } - - @Override - public String contentAsString() { - if (body == null) { - return ""; - } - return Unicode.fromBytes(contentByteArray(), contentByteArrayOffset(), - contentLength()); - } - - @Override - public String header(String name) { - return null; - } - - @Override - public boolean hasParam(String key) { - return params.containsKey(key); - } - - @Override - public String param(String key) { - - String p = params.get(key); - return p; - } - - @Override - public Map params() { - return params; - } - - @Override - public String param(String key, String defaultValue) { - String value = params.get(key); - if (value == null) { - return defaultValue; - } - return value; - } +public class ZMQRestRequest extends RestRequest{ + + private final List parts; + + private Method method; + + private String uri; + + private String rawPath; + + private final Map params; + private final Map headers; + + public ByteBuffer body; + + public ZMQRestRequest(String payload, List parts) { + super(); + this.parts = parts; + this.params = new HashMap(); + this.headers = new HashMap(); + + parse(payload); + } + + private void parse(String payload) { + + if (payload != null) { + + String[] s = payload.split("\\|"); + + if (s.length < 2) { + throw new ZMQTransportException("Invalid message format"); + } + + // Method + String m = s[0]; + + if ("GET".equalsIgnoreCase(m)) { + this.method = Method.GET; + } else if ("POST".equalsIgnoreCase(m)) { + this.method = Method.POST; + } else if ("PUT".equalsIgnoreCase(m)) { + this.method = Method.PUT; + } else if ("DELETE".equalsIgnoreCase(m)) { + this.method = Method.DELETE; + } else if ("OPTIONS".equalsIgnoreCase(m)) { + this.method = Method.OPTIONS; + } else if ("HEAD".equalsIgnoreCase(m)) { + this.method = Method.HEAD; + } else { + throw new UnsupportedMethodZMQException(m); + } + + // URI + this.uri = s[1]; + + if ((this.uri == null) || ("".equals(this.uri)) || "null".equalsIgnoreCase(this.uri)) { + throw new NoURIFoundZMQException(); + } + + int pathEndPos = uri.indexOf('?'); + if (pathEndPos < 0) { + this.rawPath = uri; + } else { + this.rawPath = uri.substring(0, pathEndPos); + RestUtils.decodeQueryString(uri, pathEndPos + 1, params); + } + + // Content + int indexContent = payload.indexOf(ZMQSocket.SEPARATOR, m.length() + uri.length()); + body = ByteBuffer.wrap(payload.substring(indexContent + 1).getBytes()); + } + } + + @Override + public Method method() { + return this.method; + } + + @Override + public String uri() { + return this.uri; + } + + @Override + public String rawPath() { + return this.rawPath; + } + + @Override + public boolean hasContent() { + return ((body != null) && (body.remaining() > 0)); + } + + @Override + public boolean contentUnsafe() { + return false; + } + + @Override + public String header(String name) { + return null; + } + + @Override + public boolean hasParam(String key) { + return params.containsKey(key); + } + + @Override + public String param(String key) { + + String p = params.get(key); + return p; + } + + @Override + public Map params() { + return params; + } + + @Override + public String param(String key, String defaultValue) { + String value = params.get(key); + if (value == null) { + return defaultValue; + } + return value; + } + + @Override + public BytesReference content() { + BytesArray bArray = new BytesArray(body.array()); + return bArray; + } + + @Override + public Iterable> headers() { + return (Iterable>) params; + } } diff --git a/src/main/java/org/elasticsearch/zeromq/ZMQRestResponse.java b/src/main/java/org/elasticsearch/zeromq/ZMQRestResponse.java index 2018c94..251ee23 100644 --- a/src/main/java/org/elasticsearch/zeromq/ZMQRestResponse.java +++ b/src/main/java/org/elasticsearch/zeromq/ZMQRestResponse.java @@ -3,93 +3,97 @@ import java.io.IOException; import java.nio.ByteBuffer; -import org.elasticsearch.common.Bytes; import org.elasticsearch.rest.AbstractRestResponse; import org.elasticsearch.rest.RestStatus; /** * @author tlrx - * + * */ public class ZMQRestResponse extends AbstractRestResponse { - private final RestStatus status; - - public ByteBuffer body; - - private String contentType; - - public ZMQRestResponse(RestStatus status) { - super(); - this.status = status; - } - - @Override - public String contentType() { - return contentType; - } - - public ZMQRestResponse setBody(ByteBuffer body) { - this.body = body; - return this; - } - - @Override - public byte[] content() throws IOException { - if (body == null) { - return Bytes.EMPTY_ARRAY; - } - return body.array(); - } - - @Override - public int contentLength() throws IOException { - if (body == null) { - return 0; - } - return body.remaining(); - } - - @Override - public RestStatus status() { - return status; - } - - @Override - public boolean contentThreadSafe() { - return false; - } - - public void setContentType(String contentType) { - this.contentType = contentType; - } - - /** - * @return the payload to reply to the client - * @throws IOException - */ - public byte[] payload() { - - // TODO optimise & challenge thoses lines... - ByteBuffer bStatusCode = ByteBuffer.wrap(Integer.toString(this.status.getStatus()).getBytes()); - ByteBuffer bStatusName = ByteBuffer.wrap(this.status.name().getBytes()); - ByteBuffer bSep1 = ByteBuffer.wrap(ZMQSocket.SEPARATOR.getBytes()); - ByteBuffer bSep2 = ByteBuffer.wrap(ZMQSocket.SEPARATOR.getBytes()); - ByteBuffer bContent = null; - - try { - bContent = ByteBuffer.wrap(content()); - } catch (Exception e) { - bContent = ByteBuffer.wrap(e.getMessage().getBytes()); - } - - ByteBuffer payload = ByteBuffer.allocate(bStatusCode.limit() + bSep1.limit() + bStatusName.limit() + bSep2.limit() + bContent.limit()); - payload.put(bStatusCode); - payload.put(bSep1); - payload.put(bStatusName); - payload.put(bSep2); - payload.put(bContent); - - return payload.array(); - } + private final RestStatus status; + + public ByteBuffer body; + + private String contentType; + + public ZMQRestResponse(RestStatus status) { + super(); + this.status = status; + } + + @Override + public String contentType() { + return contentType; + } + + public ZMQRestResponse setBody(ByteBuffer body) { + this.body = body; + return this; + } + + @Override + public byte[] content() throws IOException { + if (body == null) { + byte[] emptyArray = new byte[0]; + return emptyArray; + } + return body.array(); + } + + @Override + public int contentLength() throws IOException { + if (body == null) { + return 0; + } + return body.remaining(); + } + + @Override + public RestStatus status() { + return status; + } + + @Override + public boolean contentThreadSafe() { + return false; + } + + public void setContentType(String contentType) { + this.contentType = contentType; + } + + /** + * @return the payload to reply to the client + */ + public byte[] payload() { + + // TODO optimise & challenge thoses lines... + ByteBuffer bStatusCode = ByteBuffer.wrap(Integer.toString(this.status.getStatus()).getBytes()); + ByteBuffer bStatusName = ByteBuffer.wrap(this.status.name().getBytes()); + ByteBuffer bSep1 = ByteBuffer.wrap(ZMQSocket.SEPARATOR.getBytes()); + ByteBuffer bSep2 = ByteBuffer.wrap(ZMQSocket.SEPARATOR.getBytes()); + ByteBuffer bContent = null; + + try { + bContent = ByteBuffer.wrap(content()); + } catch (Exception e) { + bContent = ByteBuffer.wrap(e.getMessage().getBytes()); + } + + ByteBuffer payload = ByteBuffer.allocate(bStatusCode.limit() + bSep1.limit() + bStatusName.limit() + bSep2.limit() + bContent.limit()); + payload.put(bStatusCode); + payload.put(bSep1); + payload.put(bStatusName); + payload.put(bSep2); + payload.put(bContent); + + return payload.array(); + } + + @Override + public int contentOffset() throws IOException { + throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + } } diff --git a/src/main/java/org/elasticsearch/zeromq/ZMQServer.java b/src/main/java/org/elasticsearch/zeromq/ZMQServer.java index dba97a3..82ab405 100644 --- a/src/main/java/org/elasticsearch/zeromq/ZMQServer.java +++ b/src/main/java/org/elasticsearch/zeromq/ZMQServer.java @@ -19,7 +19,7 @@ package org.elasticsearch.zeromq; -import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -46,7 +46,7 @@ public ZMQServer(Settings settings, NodeService nodeService, } @Override - protected void doStart() throws ElasticSearchException { + protected void doStart() throws ElasticsearchException { logger.debug("Starting ØMQ server..."); daemonThreadFactory(settings, "zeromq_server").newThread( @@ -61,13 +61,13 @@ public void run() { @Override - protected void doStop() throws ElasticSearchException { + protected void doStop() throws ElasticsearchException { nodeService.removeAttribute("zeromq_address"); transport.stop(); } @Override - protected void doClose() throws ElasticSearchException { + protected void doClose() throws ElasticsearchException { transport.close(); } } diff --git a/src/main/java/org/elasticsearch/zeromq/impl/ZMQQueueServerImpl.java b/src/main/java/org/elasticsearch/zeromq/impl/ZMQQueueServerImpl.java index d248414..ff6dc72 100644 --- a/src/main/java/org/elasticsearch/zeromq/impl/ZMQQueueServerImpl.java +++ b/src/main/java/org/elasticsearch/zeromq/impl/ZMQQueueServerImpl.java @@ -12,7 +12,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.network.NetworkService; @@ -94,7 +94,7 @@ protected ZMQQueueServerImpl(Settings settings, NodeService nodeService, ZMQRest } @Override - protected void doStart() throws ElasticSearchException { + protected void doStart() throws ElasticsearchException { logger.debug("Starting ØMQ dealer socket..."); dealer = context.socket(ZMQ.DEALER); @@ -144,7 +144,7 @@ protected void doStart() throws ElasticSearchException { } @Override - protected void doClose() throws ElasticSearchException { + protected void doClose() throws ElasticsearchException { logger.info("Closing ØMQ server..."); // After next incoming message, sockets will close themselves @@ -183,7 +183,7 @@ protected void doClose() throws ElasticSearchException { } @Override - protected void doStop() throws ElasticSearchException { + protected void doStop() throws ElasticsearchException { logger.debug("Stopping ØMQ server..."); } From e988307d55dd7084e48a613cb2f73c4366ca4526 Mon Sep 17 00:00:00 2001 From: "Ben.Aldrich" Date: Wed, 18 Jun 2014 11:34:36 -0600 Subject: [PATCH 2/5] fix unit tests --- .../zeromq/test/ZMQTransportPluginTest.java | 337 +++++++++--------- 1 file changed, 177 insertions(+), 160 deletions(-) diff --git a/src/test/java/org/elasticsearch/zeromq/test/ZMQTransportPluginTest.java b/src/test/java/org/elasticsearch/zeromq/test/ZMQTransportPluginTest.java index 7a8442b..bc52777 100644 --- a/src/test/java/org/elasticsearch/zeromq/test/ZMQTransportPluginTest.java +++ b/src/test/java/org/elasticsearch/zeromq/test/ZMQTransportPluginTest.java @@ -12,174 +12,191 @@ import org.elasticsearch.node.Node; import org.elasticsearch.node.NodeBuilder; import org.elasticsearch.zeromq.ZMQSocket; +import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.zeromq.ZMQ; public class ZMQTransportPluginTest { - - private static Node node = null; - - private static ZMQ.Context context = null; - - /* - * ØMQ Socket binding adress, must be coherent with elasticsearch.yml config file - */ - private static final String address = "tcp://localhost:9800"; - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - // Instantiate an ES server - node = NodeBuilder.nodeBuilder() - .settings( - ImmutableSettings.settingsBuilder() - .put("es.config", "elasticsearch.yml") - ).node(); - - // Instantiate a ZMQ context - context = ZMQ.context(1); - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - if(node != null){ - node.close(); - } - - try { - context.term(); - } catch (Exception e2) { - // ignore - } - } - /** - * Simple method to send & receive zeromq message - * - * @param method - * @param uri - * @param json - * @return - */ - private String sendAndReceive(String method, String uri, String json){ - - ZMQ.Socket socket = context.socket(ZMQ.DEALER); - socket.connect(address); - - // Handshake - try { - Thread.sleep(100); - } catch (Exception e) { - Assert.fail("Handshake failed"); - } - - StringBuilder sb = new StringBuilder(method); - sb.append(ZMQSocket.SEPARATOR).append(uri).append(ZMQSocket.SEPARATOR); - - if(json != null){ - sb.append(json); - } - - String result = null; - try { - socket.send(sb.toString().getBytes("UTF-8"), 0); - - byte[] response = socket.recv(0); - result = new String(response, Charset.forName("UTF-8")); - - } catch (UnsupportedEncodingException e) { - Assert.fail("Exception when sending/receiving message"); - } finally { - try { - socket.close(); - } catch (Exception e2) { - // ignore - } - } - return result; - } - - - @Test - public void testDeleteMissingIndex(){ - String response = sendAndReceive("DELETE", "/test-index-missing/", null); - Assert.assertEquals("404|NOT_FOUND|{\"error\":\"IndexMissingException[[test-index-missing] missing]\",\"status\":404}", response); - } - - @Test - public void testCreateIndex(){ - String response = sendAndReceive("DELETE", "/books/", null); - Assert.assertNotNull(response); - - response = sendAndReceive("PUT", "/books/", null); - Assert.assertEquals("200|OK|{\"ok\":true,\"acknowledged\":true}", response); - } - - @Test - public void testMapping() throws IOException{ - XContentBuilder mapping = jsonBuilder() - .startObject() - .startObject("book") - .startObject("properties") - .startObject("title") - .field("type", "string") - .field("analyzer", "french") - .endObject() - .startObject("author") - .field("type", "string") - .endObject() - .startObject("year") - .field("type", "integer") - .endObject() - .startObject("publishedDate") - .field("type", "date") - .endObject() - .endObject() - .endObject() - .endObject(); - - String response = sendAndReceive("PUT", "/books/book/_mapping", mapping.string()); - Assert.assertEquals("200|OK|{\"ok\":true,\"acknowledged\":true}", response); - } - - @Test - public void testIndex() throws IOException{ - XContentBuilder book1 = jsonBuilder() - .startObject() - .field("title", "Les Misérables") - .field("author", "Victor Hugo") - .field("year", "1862") - .field("publishedDate", new Date()) - .endObject(); - - String response = sendAndReceive("PUT", "/books/book/1", book1.string()); - Assert.assertEquals("201|CREATED|{\"ok\":true,\"_index\":\"books\",\"_type\":\"book\",\"_id\":\"1\",\"_version\":1}", response); - - XContentBuilder book2 = jsonBuilder() - .startObject() - .field("title", "Notre-Dame de Paris") - .field("author", "Victor Hugo") - .field("year", "1831") - .field("publishedDate", new Date()) - .endObject(); - - response = sendAndReceive("PUT", "/books/book/2", book2.string()); - Assert.assertEquals("201|CREATED|{\"ok\":true,\"_index\":\"books\",\"_type\":\"book\",\"_id\":\"2\",\"_version\":1}", response); - - XContentBuilder book3 = jsonBuilder() - .startObject() - .field("title", "Le Dernier Jour d'un condamné") - .field("author", "Victor Hugo") - .field("year", "1829") - .field("publishedDate", new Date()) - .endObject(); - - response = sendAndReceive("POST", "/books/book", book3.string()); - Assert.assertNotNull("Response should not be null", response); - Assert.assertTrue(response.startsWith("201|CREATED|{\"ok\":true,\"_index\":\"books\",\"_type\":\"book\",\"_id\"")); - } + private static Node node = null; + protected ZMQ.Socket socket = null; + + private static ZMQ.Context context = null; + + /* + * ØMQ Socket binding adress, must be coherent with elasticsearch.yml config file + */ + private static final String address = "tcp://localhost:9800"; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + // Instantiate an ES server + node = NodeBuilder.nodeBuilder() + .settings( + ImmutableSettings.settingsBuilder() + .put("es.config", "elasticsearch.yml") + ).node(); + + // Instantiate a ZMQ context + context = ZMQ.context(1); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + if (node != null) { + node.close(); + } + + try { + context.term(); + } catch (Exception e2) { + // ignore + } + } + + @Before + public void setUp() { + socket = context.socket(ZMQ.DEALER); + socket.setHWM(100); + socket.setRcvHWM(100); + socket.setSndHWM(100); + socket.connect(address); + // Handshake + try { + Thread.sleep(100); + } catch (Exception e) { + Assert.fail("Handshake failed"); + } + } + + @After + public void tearDown() { + try { + if (null != socket) { + socket.close(); + Thread.sleep(500); + } + + } catch (Exception e2) { + // ignore + } + } + + /** + * Simple method to send & receive zeromq message + * + * @param method + * @param uri + * @param json + * @return + */ + private String sendAndReceive(String method, String uri, String json) { + + StringBuilder sb = new StringBuilder(method); + sb.append(ZMQSocket.SEPARATOR).append(uri).append(ZMQSocket.SEPARATOR); + + if (json != null) { + sb.append(json); + } + + String result = null; + try { + socket.send(sb.toString().getBytes("UTF-8"), 0); + + byte[] response = socket.recv(0); + result = new String(response, Charset.forName("UTF-8")); + + } catch (UnsupportedEncodingException e) { + Assert.fail("Exception when sending/receiving message"); + } + return result; + } + + @Test + public void testDeleteMissingIndex() { + String response = sendAndReceive("DELETE", "/test-index-missing/", null); + + Assert.assertTrue(response.startsWith("404|NOT_FOUND|{\"error\":\"IndexMissingException[[test-index-missing] missing]\",\"status\":404}")); + + } + + @Test + public void testCreateIndex() { + String response = sendAndReceive("DELETE", "/books/", null); + Assert.assertNotNull(response); + + response = sendAndReceive("PUT", "/books/", null); + Assert.assertTrue(response.startsWith("200|OK|{\"acknowledged\":true}")); + } + + @Test + public void testMapping() throws IOException { + XContentBuilder mapping = jsonBuilder() + .startObject() + .startObject("book") + .startObject("properties") + .startObject("title") + .field("type", "string") + .field("analyzer", "french") + .endObject() + .startObject("author") + .field("type", "string") + .endObject() + .startObject("year") + .field("type", "integer") + .endObject() + .startObject("publishedDate") + .field("type", "date") + .endObject() + .endObject() + .endObject() + .endObject(); + + String response = sendAndReceive("PUT", "/books/book/_mapping", mapping.string()); + System.out.println(response); + Assert.assertTrue(response.startsWith("200|OK|{\"acknowledged\":true}")); + } + + @Test + public void testIndex() throws IOException { + XContentBuilder book1 = jsonBuilder() + .startObject() + .field("title", "Les Misérables") + .field("author", "Victor Hugo") + .field("year", "1862") + .field("publishedDate", new Date()) + .endObject(); + + String response = sendAndReceive("PUT", "/books/book/1", book1.string()); + Assert.assertTrue(response.startsWith("201|CREATED|{\"_index\":\"books\",\"_type\":\"book\",\"_id\":\"1\",\"_version\":1,\"created\":true}")); + + XContentBuilder book2 = jsonBuilder() + .startObject() + .field("title", "Notre-Dame de Paris") + .field("author", "Victor Hugo") + .field("year", "1831") + .field("publishedDate", new Date()) + .endObject(); + + response = sendAndReceive("PUT", "/books/book/2", book2.string()); + Assert.assertTrue(response.startsWith("201|CREATED|{\"_index\":\"books\",\"_type\":\"book\",\"_id\":\"2\",\"_version\":1,\"created\":true}")); + + XContentBuilder book3 = jsonBuilder() + .startObject() + .field("title", "Le Dernier Jour d'un condamné") + .field("author", "Victor Hugo") + .field("year", "1829") + .field("publishedDate", new Date()) + .endObject(); + + response = sendAndReceive("POST", "/books/book", book3.string()); + Assert.assertNotNull("Response should not be null", response); + Assert.assertTrue(response.startsWith("201|CREATED|{\"_index\":\"books\",\"_type\":\"book\"")); + } @Test public void testRefresh() throws IOException{ From e457f821afa8b02a31d18d8198ac71cf062fda82 Mon Sep 17 00:00:00 2001 From: "Ben.Aldrich" Date: Wed, 18 Jun 2014 11:35:59 -0600 Subject: [PATCH 3/5] upgrade version --- pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 8610a9f..541fdcc 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ org.elasticsearch.plugin transport-zeromq jar - 0.0.5-SNAPSHOT + 0.0.6-SNAPSHOT ØMQ transport layer plugin for Elasticsearch https://github.com/itiu/transport-zeromq @@ -174,4 +174,4 @@ file:///tmp/local_tmp_repo/ - \ No newline at end of file + From 8c2b288782165bb23205d53e7790a0b94b5b1bfa Mon Sep 17 00:00:00 2001 From: Ryan Kophs Date: Wed, 31 Dec 2014 12:49:56 -0700 Subject: [PATCH 4/5] modified to fit the ES changes --- pom.xml | 2 +- .../org/elasticsearch/zeromq/ZMQRestImpl.java | 12 ++++---- .../elasticsearch/zeromq/ZMQRestResponse.java | 28 ++++++------------- 3 files changed, 16 insertions(+), 26 deletions(-) diff --git a/pom.xml b/pom.xml index 541fdcc..02fc374 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ https://github.com/itiu/transport-zeromq - 1.1.2 + 1.4.2 diff --git a/src/main/java/org/elasticsearch/zeromq/ZMQRestImpl.java b/src/main/java/org/elasticsearch/zeromq/ZMQRestImpl.java index 36e0184..bc8f03b 100644 --- a/src/main/java/org/elasticsearch/zeromq/ZMQRestImpl.java +++ b/src/main/java/org/elasticsearch/zeromq/ZMQRestImpl.java @@ -50,13 +50,13 @@ public ZMQRestResponse process(ZMQRestRequest request){ final CountDownLatch latch = new CountDownLatch(1); final AtomicReference ref = new AtomicReference(); - this.restController.dispatchRequest(request, new RestChannel() { + this.restController.dispatchRequest(request, new RestChannel(request) { @Override public void sendResponse(RestResponse response) { try { if(logger.isTraceEnabled()){ - logger.info("Response to ØMQ client: {}", new String(response.content())); + logger.info("Response to ØMQ client: {}", new String(response.content().toBytes())); } ref.set(convert(response)); } catch (IOException e) { @@ -80,13 +80,13 @@ private ZMQRestResponse convert(RestResponse response) throws IOException { if(response.contentType() != null){ zmqResponse.setContentType(response.contentType()); } - if (response.contentLength() > 0) { + if (response.content().length() > 0) { if (response.contentThreadSafe()) { - zmqResponse.setBody(ByteBuffer.wrap(response.content(), 0, response.contentLength())); + zmqResponse.setBody(ByteBuffer.wrap(response.content().toBytes(), 0, response.content().length())); } else { // argh!, we need to copy it over since we are not on the same thread... - byte[] body = new byte[response.contentLength()]; - System.arraycopy(response.content(), 0, body, 0, response.contentLength()); + byte[] body = new byte[response.content().length()]; + System.arraycopy(response.content(), 0, body, 0, response.content().length()); zmqResponse.setBody(ByteBuffer.wrap(body)); } } diff --git a/src/main/java/org/elasticsearch/zeromq/ZMQRestResponse.java b/src/main/java/org/elasticsearch/zeromq/ZMQRestResponse.java index 251ee23..be7d65d 100644 --- a/src/main/java/org/elasticsearch/zeromq/ZMQRestResponse.java +++ b/src/main/java/org/elasticsearch/zeromq/ZMQRestResponse.java @@ -3,14 +3,16 @@ import java.io.IOException; import java.nio.ByteBuffer; -import org.elasticsearch.rest.AbstractRestResponse; +import org.elasticsearch.rest.RestResponse; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.bytes.BytesArray; /** * @author tlrx * */ -public class ZMQRestResponse extends AbstractRestResponse { +public class ZMQRestResponse extends RestResponse { private final RestStatus status; @@ -34,20 +36,13 @@ public ZMQRestResponse setBody(ByteBuffer body) { } @Override - public byte[] content() throws IOException { + public BytesReference content() { + if (body == null) { byte[] emptyArray = new byte[0]; - return emptyArray; + return new BytesArray(emptyArray); } - return body.array(); - } - - @Override - public int contentLength() throws IOException { - if (body == null) { - return 0; - } - return body.remaining(); + return new BytesArray(body.array()); } @Override @@ -77,7 +72,7 @@ public byte[] payload() { ByteBuffer bContent = null; try { - bContent = ByteBuffer.wrap(content()); + bContent = ByteBuffer.wrap(content().toBytes()); } catch (Exception e) { bContent = ByteBuffer.wrap(e.getMessage().getBytes()); } @@ -91,9 +86,4 @@ public byte[] payload() { return payload.array(); } - - @Override - public int contentOffset() throws IOException { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. - } } From 5a14c019a63bd35b33efe454f9d8c81ed32b2578 Mon Sep 17 00:00:00 2001 From: Ryan Kophs Date: Wed, 31 Dec 2014 13:30:44 -0700 Subject: [PATCH 5/5] 1.4.2 seems to be unstable, 1.4.1 works --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 02fc374..befa1c6 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ https://github.com/itiu/transport-zeromq - 1.4.2 + 1.4.1