Skip to content
This repository has been archived by the owner on May 4, 2019. It is now read-only.

Commit

Permalink
Merge branch 'release/0.8.10'
Browse files Browse the repository at this point in the history
  • Loading branch information
rdegnan committed Sep 4, 2018
2 parents c49718f + 3210e5e commit c2f1ec4
Show file tree
Hide file tree
Showing 80 changed files with 2,258 additions and 6,599 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Netifi Proteus Java

<a href='https://travis-ci.org/netifi-proteus/proteus-java'><img src='https://travis-ci.org/netifi-proteus/proteus-java.svg?branch=master'></a>
[![Join the chat at https://gitter.im/netifi/general](https://badges.gitter.im/netifi/general.svg)](https://gitter.im/netifi/general) <a href='https://travis-ci.org/netifi-proteus/proteus-java'><img src='https://travis-ci.org/netifi-proteus/proteus-java.svg?branch=master'></a>


## Build from Source
Expand Down
48 changes: 17 additions & 31 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -46,37 +46,16 @@ project(':broker-mgmt-idl') {
apply from: file('../gradle/java.gradle')
}

project(':client') {
description = 'Netifi Proteus Client'
ext.artifactName = 'client'

apply from: file('../gradle/java.gradle')
}

project(':core') {
description = 'Netifi Core Library'
ext.artifactName = 'core'

apply from: file('../gradle/java.gradle')
}

project(':core-idl') {
description = 'Netifi Core IDL'
ext.artifactName = 'core-idl'

apply from: file('../gradle/java.gradle')
}

project(':frames') {
description = 'Netifi Proteus Frames'
ext.artifactName = 'frames'

apply from: file('../gradle/java.gradle')
}

project(':metrics-idl') {
description = 'Netifi Proteus Metrics IDL'
ext.artifactName = 'metrics-idl'
project(':client') {
description = 'Netifi Proteus Client'
ext.artifactName = 'client'

apply from: file('../gradle/java.gradle')
}
Expand All @@ -95,13 +74,6 @@ project(':metrics-prometheus') {
apply from: file('../gradle/java.gradle')
}

project(':protobuf-rpc') {
description = 'Netifi Proteus RPC'
ext.artifactName = 'protobuf-rpc'

//apply from: file('../gradle/java.gradle')
}

project(':tracing-openzipkin') {
description = 'Netifi Proteus Openzipkin Integration'
ext.artifactName = 'tracing-openzipkin'
Expand All @@ -116,3 +88,17 @@ project(':tracing-idl') {

apply from: file('../gradle/java.gradle')
}

project(':viz-idl') {
description = 'Netifi Proteus Vizceral IDL'
ext.artifactName = 'viz-idl'

apply from: file('../gradle/java.gradle')
}

project(':vizceral') {
description = 'Netifi Proteus Vizceral Service'
ext.artifactName = 'vizceral'

apply from: file('../gradle/java.gradle')
}
28 changes: 16 additions & 12 deletions client/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,41 +8,45 @@ targetCompatibility = 1.8
dependencies {
protobuf project (':broker-info-idl')

compile project (':core')
compile 'io.rsocket.rpc:core:0.1.2'
compile 'io.rsocket.rpc:frames:0.1.2'
compile project (':auth')
compile project (':frames')

compile 'com.typesafe:config:1.3.2'
compile 'com.google.guava:guava:22.0'
compile 'io.netty:netty-tcnative:2.0.12.Final:linux-x86_64'

testProtobuf project (':core-idl')
testProtobuf project (':protobuf-rpc')
testCompile 'io.micrometer:micrometer-registry-atlas:1.0.3'
testProtobuf 'io.rsocket.rpc:core-idl:0.1.2'
testCompile 'io.rsocket.rpc:core:0.1.2'
testCompile project(':tracing-openzipkin')
testCompile project(':metrics-micrometer')
testCompile 'javax.inject:javax.inject:1'
testCompile 'junit:junit:4.12'
testCompile 'org.apache.logging.log4j:log4j-api:2.9.0'
testCompile 'org.apache.logging.log4j:log4j-core:2.9.0'
testCompile 'org.apache.logging.log4j:log4j-slf4j-impl:2.9.0'
}

def protocPluginBaseName = "proteus-java-${osdetector.os}-${osdetector.arch}"
def javaPluginPath = "$rootDir/protobuf-rpc/build/exe/java_plugin/$protocPluginBaseName"

protobuf {
generatedFilesBaseDir = "${projectDir}/src/generated"

protoc {
artifact = 'com.google.protobuf:protoc:3.6.0'
}
plugins {
proteus {
path = javaPluginPath
rsocketRpc {
artifact = 'io.rsocket.rpc:protobuf-rpc:0.1.2'
}
}
generateProtoTasks {
all().each { task ->
task.dependsOn ':protobuf-rpc:java_pluginExecutable'
// Recompile protos when the codegen has been changed
task.inputs.file javaPluginPath
// Recompile protos when build.gradle has been changed, because
// it's possible the version of protoc has been changed.
task.inputs.file "${rootProject.projectDir}/build.gradle"
task.plugins {
proteus {}
rsocketRpc {}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
import io.netifi.proteus.presence.PresenceNotifier;
import io.netifi.proteus.rsocket.*;
import io.netifi.proteus.rsocket.transport.WeightedClientTransportSupplier;
import io.netifi.proteus.stats.FrugalQuantile;
import io.netifi.proteus.stats.Quantile;
import io.rsocket.rpc.stats.FrugalQuantile;
import io.rsocket.rpc.stats.Quantile;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
Expand All @@ -29,11 +29,13 @@
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;

import io.rsocket.rpc.rsocket.RequestHandlingRSocket;
import io.netifi.proteus.rsocket.UnwrappingRSocket;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;

public class DefaultProteusBrokerService implements ProteusBrokerService, Disposable {
Expand Down
60 changes: 43 additions & 17 deletions client/src/main/java/io/netifi/proteus/Proteus.java
Original file line number Diff line number Diff line change
@@ -1,18 +1,25 @@
package io.netifi.proteus;

import io.micrometer.core.instrument.MeterRegistry;
import io.netifi.proteus.rsocket.ProteusSocket;
import io.netifi.proteus.rsocket.RequestHandlingRSocket;
import io.rsocket.rpc.rsocket.RequestHandlingRSocket;
import io.rsocket.rpc.RSocketRpcService;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.ssl.OpenSsl;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.SslProvider;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.opentracing.Tracer;
import io.rsocket.Closeable;
import io.rsocket.transport.ClientTransport;
import io.rsocket.transport.netty.client.TcpClientTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Exceptions;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.ipc.netty.tcp.TcpClient;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
Expand Down Expand Up @@ -94,7 +101,7 @@ public Mono<Void> onClose() {
return onClose;
}

public Proteus addService(ProteusService service) {
public Proteus addService(RSocketRpcService service) {
requestHandlingRSocket.addService(service);
return this;
}
Expand Down Expand Up @@ -134,10 +141,7 @@ public static class Builder {
private int missedAcks = DefaultBuilderConfig.getMissedAcks();
private DestinationNameFactory destinationNameFactory;

private MeterRegistry registry = null;
private int batchSize = DefaultBuilderConfig.getBatchSize();
private Function<SocketAddress, ClientTransport> clientTransportFactory =
address -> TcpClientTransport.create((InetSocketAddress) address);
private Function<SocketAddress, ClientTransport> clientTransportFactory = null;
private int poolSize = Runtime.getRuntime().availableProcessors();
private Supplier<Tracer> tracerSupplier = () -> null;

Expand All @@ -152,13 +156,8 @@ public Builder poolSize(int poolSize) {
return this;
}

public Builder metricBatchSize(int batchSize) {
this.batchSize = batchSize;
return this;
}

public Builder keepalive(boolean useKeepAlive) {
this.keepalive = keepalive;
this.keepalive = useKeepAlive;
return this;
}

Expand Down Expand Up @@ -196,9 +195,7 @@ public Builder seedAddresses(Collection<SocketAddress> addresses) {
if (addresses instanceof List) {
this.seedAddresses = (List<SocketAddress>) addresses;
} else {
List<SocketAddress> list = new ArrayList<>();
list.addAll(addresses);
this.seedAddresses = list;
this.seedAddresses = new ArrayList<>(addresses);
}

return this;
Expand Down Expand Up @@ -245,6 +242,35 @@ public Proteus build() {
Objects.requireNonNull(accessToken, "account token is required");
Objects.requireNonNull(group, "group is required");

if (clientTransportFactory == null) {
logger.info("Client transport factory not provided; using TCP transport.");
try {
final SslProvider sslProvider;
if (OpenSsl.isAvailable()) {
logger.info("Native SSL provider is available; will use native provider.");
sslProvider = SslProvider.OPENSSL_REFCNT;
} else {
logger.info("Native SSL provider not available; will use JDK SSL provider.");
sslProvider = SslProvider.JDK;
}
final SslContext sslContext =
SslContextBuilder.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.sslProvider(sslProvider)
.build();
clientTransportFactory = address -> {
TcpClient client =
TcpClient.create(
opts ->
opts.connectAddress(() -> address)
.sslContext(sslContext));
return TcpClientTransport.create(client);
};
} catch (Exception sslException) {
throw Exceptions.bubble(sslException);
}
}

this.accessTokenBytes = Base64.getDecoder().decode(accessToken);

if (destinationNameFactory == null) {
Expand All @@ -259,7 +285,7 @@ public Proteus build() {
if (seedAddresses == null) {
Objects.requireNonNull(host, "host is required");
Objects.requireNonNull(port, "port is required");
socketAddresses = Arrays.asList(InetSocketAddress.createUnresolved(host, port));
socketAddresses = Collections.singletonList(InetSocketAddress.createUnresolved(host, port));
} else {
socketAddresses = seedAddresses;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public void watch(String group) {
}
}
})
.onErrorResume(err -> Mono.delay(Duration.ofMillis(500)).then(Mono.error(err)))
.retry()
.subscribe(this::joinEvent));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
package io.netifi.proteus.rsocket;

import io.netifi.proteus.frames.*;
import io.netty.buffer.ByteBuf;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.netifi.proteus.frames.*;
import io.rsocket.util.ByteBufPayload;
import io.rsocket.util.RSocketProxy;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

// Need to unwrap Proteus Messages
// Need to unwrap RSocketRpc Messages
public class UnwrappingRSocket extends RSocketProxy {

public UnwrappingRSocket(RSocket source) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package io.netifi.proteus.rsocket;

import io.netifi.proteus.DestinationNameFactory;
import io.netifi.proteus.exception.TimeoutException;
import io.rsocket.rpc.exception.TimeoutException;
import io.netifi.proteus.rsocket.transport.WeightedClientTransportSupplier;
import io.netifi.proteus.stats.Ewma;
import io.netifi.proteus.stats.Median;
import io.netifi.proteus.stats.Quantile;
import io.rsocket.rpc.stats.Ewma;
import io.rsocket.rpc.stats.Median;
import io.rsocket.rpc.stats.Quantile;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.rsocket.*;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.netifi.proteus.rsocket.transport;

import io.netifi.proteus.rsocket.WeightedRSocket;
import io.netifi.proteus.stats.Ewma;
import io.rsocket.rpc.stats.Ewma;
import io.rsocket.Closeable;
import io.rsocket.transport.ClientTransport;
import io.rsocket.util.Clock;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
@Ignore
public class ProteusIntegrationTest {

private static final long accessKey = 3855261330795754807L;
private static final long accessKey = 9007199254740991L;
private static final String accessToken = "kTBDVtfRBO4tHOnZzSyY5ym2kfY=";
private static final String host = "localhost";
private static final String host = "edge.netifi.io";
private static final int port = 8001;
private static final int server_port = 8001;
private static Proteus server;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public boolean enabled() {
}
});
ProteusMetricsExporter exporter = new ProteusMetricsExporter(client, registry);
MetricsExporter exporter = new MetricsExporter(client, registry);
exporter.run();
Counter test = registry.counter("test");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.netifi.proteus.rsocket;

import io.netifi.proteus.DestinationNameFactory;
import io.netifi.proteus.stats.FrugalQuantile;
import io.rsocket.rpc.stats.FrugalQuantile;
import io.netty.buffer.Unpooled;
import io.rsocket.RSocket;
import org.junit.Test;
Expand Down
26 changes: 0 additions & 26 deletions core-idl/build.gradle

This file was deleted.

Loading

0 comments on commit c2f1ec4

Please sign in to comment.