diff --git a/sgx_network_simulation/Dockerfile b/sgx_network_simulation/Dockerfile new file mode 100644 index 000000000..279b3ac5c --- /dev/null +++ b/sgx_network_simulation/Dockerfile @@ -0,0 +1,33 @@ +FROM golang:1.16 AS go + +RUN apt-get update && \ + apt-get install -y make g++ libgmp-dev libglib2.0-dev libssl-dev && \ + apt-get install -y protobuf-compiler && \ + apt-get clean + +WORKDIR /app +COPY tools/tcp_grpc_proxy ./ +RUN make build + +FROM python:3.6.8 + +RUN echo "deb http://archive.debian.org/debian stretch main contrib non-free" > /etc/apt/sources.list + +RUN apt-get update && \ + apt-get install -y curl vim make nginx && \ + apt-get clean + +# upgrade nginx +RUN echo "deb http://nginx.org/packages/mainline/debian/ stretch nginx deb-src http://nginx.org/packages/mainline/debian/ stretch nginx" > /etc/apt/sources.list.d/nginx.list +RUN wget -qO - https://nginx.org/keys/nginx_signing.key | apt-key add - +RUN apt update && \ + apt remove nginx-common -y && \ + apt install nginx + +COPY sgx_network_simulation/ /app/ +WORKDIR /app +COPY --from=go /app/tcp2grpc ./ +COPY --from=go /app/grpc2tcp ./ +RUN pip3 install -r requirements.txt && make protobuf + +ENTRYPOINT ["bash", "docker_entrypoint.sh"] diff --git a/sgx_network_simulation/nginx/sidecar.conf b/sgx_network_simulation/nginx/sidecar.conf new file mode 100644 index 000000000..2586392d2 --- /dev/null +++ b/sgx_network_simulation/nginx/sidecar.conf @@ -0,0 +1,22 @@ +# Forwards all traffic to nginx controller +server { + listen 32102 http2; + + # No limits + client_max_body_size 0; + grpc_read_timeout 3600s; + grpc_send_timeout 3600s; + client_body_timeout 3600s; + # grpc_socket_keepalive is recommended but not required + # grpc_socket_keepalive is supported after nginx 1.15.6 + grpc_socket_keepalive on; + + grpc_set_header Authority fl-bytedance-client-auth.com; + grpc_set_header Host fl-bytedance-client-auth.com; + grpc_set_header X-Host sgx-test.fl-cmcc.com; + + location / { + # Redirects to nginx controller + grpc_pass grpc://fedlearner-stack-ingress-nginx-controller.default.svc:80; + } +} diff --git a/sgx_network_simulation/sidecar.sh b/sgx_network_simulation/sidecar.sh new file mode 100644 index 000000000..dae4a124b --- /dev/null +++ b/sgx_network_simulation/sidecar.sh @@ -0,0 +1,66 @@ +#!/bin/bash +set -ex + +FILE_PATH="/pod-data/listen_port" +while [ ! -s "$FILE_PATH" ]; do + echo "wait for $FILE_PATH ..." + sleep 1 +done +WORKER_LISTEN_PORT=$(cat "$FILE_PATH") + +echo "# Forwards all traffic to nginx controller +server { + listen 32102 http2; + + # No limits + client_max_body_size 0; + grpc_read_timeout 3600s; + grpc_send_timeout 3600s; + client_body_timeout 3600s; + # grpc_socket_keepalive is recommended but not required + # grpc_socket_keepalive is supported after nginx 1.15.6 + grpc_socket_keepalive on; + + grpc_set_header Authority ${EGRESS_HOST}; + grpc_set_header Host ${EGRESS_HOST}; + grpc_set_header X-Host ${SERVICE_ID}.${EGRESS_DOMAIN}; + + location / { + # Redirects to nginx controller + grpc_pass grpc://fedlearner-stack-ingress-nginx-controller.default.svc:80; + } +} +" > nginx/sidecar.conf + +if [ -z "$PORT0" ]; then + PORT0=32001 +fi + +if [ -z "$PORT2" ]; then + PORT2=32102 +fi + +sed -i "s/listen [0-9]* http2;/listen $PORT2 http2;/" nginx/sidecar.conf + +cp nginx/sidecar.conf /etc/nginx/conf.d/ +service nginx restart + +# Server sidecar: grpc to tcp, 5001 is the server port of main container +echo "Starting server sidecar" +./grpc2tcp --grpc_server_port=$PORT0 \ + --target_tcp_address="localhost:$WORKER_LISTEN_PORT" & + +echo "Starting client sidecar" +./tcp2grpc --tcp_server_port="$PROXY_LOCAL_PORT" \ + --target_grpc_address="localhost:$PORT2" & + +echo "===========Sidecar started!!=============" + +while true +do + if [[ -f "/pod-data/main-terminated" ]] + then + exit 0 + fi + sleep 5 +done diff --git a/tools/tcp_grpc_proxy/Makefile b/tools/tcp_grpc_proxy/Makefile new file mode 100644 index 000000000..67e1889f9 --- /dev/null +++ b/tools/tcp_grpc_proxy/Makefile @@ -0,0 +1,13 @@ +install: + go get tcp_grpc_proxy + go mod download + +protobuf: install + go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.26 + go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.1 + PATH="${PATH}:$(shell go env GOPATH)/bin" \ + protoc -I=proto --go_out=. --go-grpc_out=. proto/*.proto + +build: protobuf + go build -o tcp2grpc cmd/tcp2grpc/main.go + go build -o grpc2tcp cmd/grpc2tcp/main.go diff --git a/tools/tcp_grpc_proxy/cmd/grpc2tcp/main.go b/tools/tcp_grpc_proxy/cmd/grpc2tcp/main.go index 872924922..2b04343bb 100644 --- a/tools/tcp_grpc_proxy/cmd/grpc2tcp/main.go +++ b/tools/tcp_grpc_proxy/cmd/grpc2tcp/main.go @@ -1,9 +1,9 @@ package main import ( - "fedlearner.net/tools/tcp_grpc_proxy/pkg/proxy" "flag" "fmt" + "tcp_grpc_proxy/proxy" ) func main() { @@ -14,6 +14,6 @@ func main() { flag.Parse() grpcServerAddress := fmt.Sprintf("0.0.0.0:%d", grpcServerPort) - grpc2tcpServer := proxy.NewGrpc2TcpServer(grpcServerAddress, targetTCPAddress) + grpc2tcpServer := proxy.NewGrpc2TCPServer(grpcServerAddress, targetTCPAddress) grpc2tcpServer.Run() } diff --git a/tools/tcp_grpc_proxy/cmd/grpcclient/main.go b/tools/tcp_grpc_proxy/cmd/grpcclient/main.go new file mode 100644 index 000000000..670a89f02 --- /dev/null +++ b/tools/tcp_grpc_proxy/cmd/grpcclient/main.go @@ -0,0 +1,51 @@ +package main + +import ( + "bytes" + "context" + "os" + "time" + + "tcp_grpc_proxy/proto" + + "github.com/sirupsen/logrus" + "google.golang.org/grpc" +) + +func main() { + // Set up a connection to the server. + grpcServer := "127.0.0.1:7766" + conn, err := grpc.Dial(grpcServer, grpc.WithInsecure()) + if err != nil { + logrus.Fatalf("did not connect: %v", err) + } + defer conn.Close() + tsc := proto.NewTunnelServiceClient(conn) + + tc, err := tsc.Tunnel(context.Background()) + if err != nil { + logrus.Fatalln(err) + } + + sendPacket := func(data []byte) error { + return tc.Send(&proto.Chunk{Data: data}) + } + + go func() { + for { + chunk, err := tc.Recv() + if err != nil { + logrus.Println("Recv terminated:", err) + os.Exit(0) + } + logrus.Println(string(chunk.Data)) + } + + }() + + for { + time.Sleep(time.Duration(2) * time.Second) + buf := bytes.NewBufferString("************Hello World**********").Bytes() + sendPacket(buf) + } +} diff --git a/tools/tcp_grpc_proxy/cmd/grpcserver/main.go b/tools/tcp_grpc_proxy/cmd/grpcserver/main.go new file mode 100644 index 000000000..b17e4432f --- /dev/null +++ b/tools/tcp_grpc_proxy/cmd/grpcserver/main.go @@ -0,0 +1,11 @@ +package main + +import ( + "tcp_grpc_proxy/grpc2tcp" +) + +func main() { + grpcServerAddress := "0.0.0.0:7766" + targetTCPAddress := "127.0.0.1:17766" + grpc2tcp.RunServer(grpcServerAddress, targetTCPAddress) +} diff --git a/tools/tcp_grpc_proxy/cmd/tcp2grpc/main.go b/tools/tcp_grpc_proxy/cmd/tcp2grpc/main.go index 9b81e8f75..fee88a884 100644 --- a/tools/tcp_grpc_proxy/cmd/tcp2grpc/main.go +++ b/tools/tcp_grpc_proxy/cmd/tcp2grpc/main.go @@ -1,11 +1,49 @@ package main import ( - "fedlearner.net/tools/tcp_grpc_proxy/pkg/proxy" "flag" "fmt" + "io" + "net" + "os" + "tcp_grpc_proxy/proxy" ) +func test() { + client, err := net.Dial("tcp", "127.0.0.1:17767") + if err != nil { + fmt.Println("err:", err) + return + } + defer client.Close() + + go func() { + input := make([]byte, 1024) + for { + n, err := os.Stdin.Read(input) + if err != nil { + fmt.Println("input err:", err) + continue + } + client.Write([]byte(input[:n])) + } + }() + + buf := make([]byte, 1024) + for { + n, err := client.Read(buf) + if err != nil { + if err == io.EOF { + return + } + fmt.Println("read err:", err) + continue + } + fmt.Println(string(buf[:n])) + + } +} + func main() { var tcpServerPort int var targetGrpcAddress string @@ -14,6 +52,6 @@ func main() { flag.Parse() tcpServerAddress := fmt.Sprintf("0.0.0.0:%d", tcpServerPort) - tcp2grpcServer := proxy.NewTcp2GrpcServer(tcpServerAddress, targetGrpcAddress) + tcp2grpcServer := proxy.NewTCP2GrpcServer(tcpServerAddress, targetGrpcAddress) tcp2grpcServer.Run() } diff --git a/tools/tcp_grpc_proxy/cmd/tcpclient/main.go b/tools/tcp_grpc_proxy/cmd/tcpclient/main.go new file mode 100644 index 000000000..7e0c97467 --- /dev/null +++ b/tools/tcp_grpc_proxy/cmd/tcpclient/main.go @@ -0,0 +1,38 @@ +package main + +import ( + "flag" + "net" + "time" + + "github.com/sirupsen/logrus" +) + +func main() { + var tcpServerAddress string + flag.StringVar(&tcpServerAddress, "tcp_server_address", "127.0.0.1:17767", + "TCP server address which the client connects to.") + + conn, err := net.Dial("tcp", tcpServerAddress) + if err != nil { + logrus.Fatalf("Dail to tcp target %s error: %v", tcpServerAddress, err) + } + logrus.Infoln("Connected to", tcpServerAddress) + // Makes sure the connection gets closed + defer conn.Close() + defer logrus.Infoln("Connection closed to ", tcpServerAddress) + + for { + conn.Write([]byte("hello world")) + logrus.Infof("Sent 'hello world' to server %s", tcpServerAddress) + + tcpData := make([]byte, 64*1024) + _, err := conn.Read(tcpData) + if err != nil { + logrus.Fatalln("Read from tcp error: ", err) + } + logrus.Infof("Received '%s' from server", string(tcpData)) + + time.Sleep(time.Duration(5) * time.Second) + } +} diff --git a/tools/tcp_grpc_proxy/cmd/tcpserver/main.go b/tools/tcp_grpc_proxy/cmd/tcpserver/main.go new file mode 100644 index 000000000..592c7b6bd --- /dev/null +++ b/tools/tcp_grpc_proxy/cmd/tcpserver/main.go @@ -0,0 +1,46 @@ +package main + +import ( + "flag" + "fmt" + "net" + + "github.com/sirupsen/logrus" +) + +func handleTCPConn(conn net.Conn) { + for { + tcpData := make([]byte, 64*1024) + bytesRead, err := conn.Read(tcpData) + if err != nil { + logrus.Fatalln("Read from tcp error: ", err) + } + logrus.Infof("TCP server got %d bytes", bytesRead) + conn.Write([]byte("This is a string from TCP server")) + } +} + +func main() { + var tcpServerPort int + flag.IntVar(&tcpServerPort, "tcp_server_port", 17766, "TCP server port") + flag.Parse() + tcpServerAddress := fmt.Sprintf("0.0.0.0:%d", tcpServerPort) + + listener, err := net.Listen("tcp", tcpServerAddress) + if err != nil { + logrus.Fatalln("Listen TCP error: ", err) + } + defer listener.Close() + logrus.Infoln("Run TCPServer at ", tcpServerAddress) + + for { + conn, err := listener.Accept() + if err != nil { + logrus.Errorln("TCP listener error:", err) + continue + } + + logrus.Infoln("Got tcp connection") + go handleTCPConn(conn) + } +} diff --git a/tools/tcp_grpc_proxy/go.mod b/tools/tcp_grpc_proxy/go.mod new file mode 100644 index 000000000..7507c284b --- /dev/null +++ b/tools/tcp_grpc_proxy/go.mod @@ -0,0 +1,12 @@ +module tcp_grpc_proxy + +go 1.16 + +require ( + github.com/golang/protobuf v1.5.2 // indirect + github.com/sirupsen/logrus v1.8.1 + golang.org/x/net v0.0.0-20210525063256-abc453219eb5 // indirect + google.golang.org/genproto v0.0.0-20200806141610-86f49bd18e98 // indirect + google.golang.org/grpc v1.38.0 + google.golang.org/protobuf v1.26.0 +) diff --git a/tools/tcp_grpc_proxy/go.sum b/tools/tcp_grpc_proxy/go.sum new file mode 100644 index 000000000..a372202d1 --- /dev/null +++ b/tools/tcp_grpc_proxy/go.sum @@ -0,0 +1,106 @@ +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= +github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= +github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= +github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE= +github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= +golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20210525063256-abc453219eb5 h1:wjuX4b5yYQnEQHzd+CBcrcC6OVR2J1CN6mUy0oSxIPo= +golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da h1:b3NXsE2LusjYGGjL5bxEVZZORm/YEFFrWFjR8eFrw/c= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= +google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= +google.golang.org/genproto v0.0.0-20200806141610-86f49bd18e98 h1:LCO0fg4kb6WwkXQXRQQgUYsFeFb5taTX5WAx5O/Vt28= +google.golang.org/genproto v0.0.0-20200806141610-86f49bd18e98/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= +google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= +google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= +google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= +google.golang.org/grpc v1.38.0 h1:/9BgsAsa5nWe26HqOlvlgJnqBuktYOLCgjCPqsa56W0= +google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGjtUeSXeh4= +google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/tools/tcp_grpc_proxy/proto/tunnel.proto b/tools/tcp_grpc_proxy/proto/tunnel.proto new file mode 100644 index 000000000..22ce1080b --- /dev/null +++ b/tools/tcp_grpc_proxy/proto/tunnel.proto @@ -0,0 +1,12 @@ +syntax = "proto3"; + +package proto; +option go_package = "proxy/proto"; + +service TunnelService { + rpc Tunnel (stream Chunk) returns (stream Chunk); +} + +message Chunk { + bytes data = 1; +} diff --git a/tools/tcp_grpc_proxy/proxy/grpc2tcp.go b/tools/tcp_grpc_proxy/proxy/grpc2tcp.go new file mode 100644 index 000000000..a9c5f598d --- /dev/null +++ b/tools/tcp_grpc_proxy/proxy/grpc2tcp.go @@ -0,0 +1,106 @@ +package proxy + +import ( + "fmt" + "io" + "net" + + "tcp_grpc_proxy/proxy/proto" + + "github.com/sirupsen/logrus" + "google.golang.org/grpc" +) + +// Grpc2TCPServer A server to proxy grpc traffic to TCP +type Grpc2TCPServer struct { + proto.UnimplementedTunnelServiceServer + grpcServerAddress string + targetTCPAddress string +} + +// Tunnel the implementation of gRPC Tunnel service +func (s *Grpc2TCPServer) Tunnel(stream proto.TunnelService_TunnelServer) error { + tcpConnection, err := net.Dial("tcp", s.targetTCPAddress) + if err != nil { + logrus.Errorf("Dail to tcp target %s error: %v", s.targetTCPAddress, err) + return err + } + logrus.Infoln("Connected to", s.targetTCPAddress) + // Makes sure the connection gets closed + defer tcpConnection.Close() + defer logrus.Infoln("Connection closed to ", s.targetTCPAddress) + + errChan := make(chan error) + + // Gets data from gRPC client and proxy to remote TCP server + go func() { + for { + chunk, err := stream.Recv() + if err == io.EOF { + return + } + if err != nil { + errChan <- fmt.Errorf("error while receiving gRPC data: %v", err) + return + } + + data := chunk.Data + logrus.Infof("Sending %d bytes to tcp server", len(data)) + _, err = tcpConnection.Write(data) + if err != nil { + errChan <- fmt.Errorf("error while sending TCP data: %v", err) + return + } + } + }() + + // Gets data from remote TCP server and proxy to gRPC client + go func() { + buff := make([]byte, 64*1024) + for { + bytesRead, err := tcpConnection.Read(buff) + if err == io.EOF { + logrus.Infoln("Remote TCP connection closed") + return + } + if err != nil { + errChan <- fmt.Errorf("error while receiving TCP data: %v", err) + return + } + + logrus.Infof("Sending %d bytes to gRPC client", bytesRead) + if err = stream.Send(&proto.Chunk{Data: buff[0:bytesRead]}); err != nil { + errChan <- fmt.Errorf("Error while sending gRPC data: %v", err) + return + } + } + }() + + // Blocking read + returnedError := <-errChan + return returnedError +} + +// NewGrpc2TCPServer constructs a Grpc2TCP server +func NewGrpc2TCPServer(grpcServerAddress, targetTCPAddress string) *Grpc2TCPServer { + return &Grpc2TCPServer{ + grpcServerAddress: grpcServerAddress, + targetTCPAddress: targetTCPAddress, + } +} + +// Run starts the Grpc2TCP server +func (s *Grpc2TCPServer) Run() { + listener, err := net.Listen("tcp", s.grpcServerAddress) + if err != nil { + logrus.Errorf("Failed to listen: ", err) + } + + // Starts a gRPC server and register services + grpcServer := grpc.NewServer() + proto.RegisterTunnelServiceServer(grpcServer, s) + logrus.Infof("Starting gRPC server at: %s, target to %s", s.grpcServerAddress, s.targetTCPAddress) + if err := grpcServer.Serve(listener); err != nil { + logrus.Errorln("Unable to start gRPC serve:", err) + } +} diff --git a/tools/tcp_grpc_proxy/proxy/proto/tunnel.pb.go b/tools/tcp_grpc_proxy/proxy/proto/tunnel.pb.go new file mode 100644 index 000000000..79602bc44 --- /dev/null +++ b/tools/tcp_grpc_proxy/proxy/proto/tunnel.pb.go @@ -0,0 +1,147 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.26.0 +// protoc v3.17.3 +// source: tunnel.proto + +package proto + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type Chunk struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Data []byte `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"` +} + +func (x *Chunk) Reset() { + *x = Chunk{} + if protoimpl.UnsafeEnabled { + mi := &file_tunnel_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Chunk) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Chunk) ProtoMessage() {} + +func (x *Chunk) ProtoReflect() protoreflect.Message { + mi := &file_tunnel_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Chunk.ProtoReflect.Descriptor instead. +func (*Chunk) Descriptor() ([]byte, []int) { + return file_tunnel_proto_rawDescGZIP(), []int{0} +} + +func (x *Chunk) GetData() []byte { + if x != nil { + return x.Data + } + return nil +} + +var File_tunnel_proto protoreflect.FileDescriptor + +var file_tunnel_proto_rawDesc = []byte{ + 0x0a, 0x0c, 0x74, 0x75, 0x6e, 0x6e, 0x65, 0x6c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x05, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x1b, 0x0a, 0x05, 0x43, 0x68, 0x75, 0x6e, 0x6b, 0x12, 0x12, + 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, + 0x74, 0x61, 0x32, 0x39, 0x0a, 0x0d, 0x54, 0x75, 0x6e, 0x6e, 0x65, 0x6c, 0x53, 0x65, 0x72, 0x76, + 0x69, 0x63, 0x65, 0x12, 0x28, 0x0a, 0x06, 0x54, 0x75, 0x6e, 0x6e, 0x65, 0x6c, 0x12, 0x0c, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x43, 0x68, 0x75, 0x6e, 0x6b, 0x1a, 0x0c, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x2e, 0x43, 0x68, 0x75, 0x6e, 0x6b, 0x28, 0x01, 0x30, 0x01, 0x42, 0x0d, 0x5a, + 0x0b, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_tunnel_proto_rawDescOnce sync.Once + file_tunnel_proto_rawDescData = file_tunnel_proto_rawDesc +) + +func file_tunnel_proto_rawDescGZIP() []byte { + file_tunnel_proto_rawDescOnce.Do(func() { + file_tunnel_proto_rawDescData = protoimpl.X.CompressGZIP(file_tunnel_proto_rawDescData) + }) + return file_tunnel_proto_rawDescData +} + +var file_tunnel_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_tunnel_proto_goTypes = []interface{}{ + (*Chunk)(nil), // 0: proto.Chunk +} +var file_tunnel_proto_depIdxs = []int32{ + 0, // 0: proto.TunnelService.Tunnel:input_type -> proto.Chunk + 0, // 1: proto.TunnelService.Tunnel:output_type -> proto.Chunk + 1, // [1:2] is the sub-list for method output_type + 0, // [0:1] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_tunnel_proto_init() } +func file_tunnel_proto_init() { + if File_tunnel_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_tunnel_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Chunk); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_tunnel_proto_rawDesc, + NumEnums: 0, + NumMessages: 1, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_tunnel_proto_goTypes, + DependencyIndexes: file_tunnel_proto_depIdxs, + MessageInfos: file_tunnel_proto_msgTypes, + }.Build() + File_tunnel_proto = out.File + file_tunnel_proto_rawDesc = nil + file_tunnel_proto_goTypes = nil + file_tunnel_proto_depIdxs = nil +} diff --git a/tools/tcp_grpc_proxy/proxy/proto/tunnel_grpc.pb.go b/tools/tcp_grpc_proxy/proxy/proto/tunnel_grpc.pb.go new file mode 100644 index 000000000..f60817673 --- /dev/null +++ b/tools/tcp_grpc_proxy/proxy/proto/tunnel_grpc.pb.go @@ -0,0 +1,133 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. + +package proto + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// TunnelServiceClient is the client API for TunnelService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type TunnelServiceClient interface { + Tunnel(ctx context.Context, opts ...grpc.CallOption) (TunnelService_TunnelClient, error) +} + +type tunnelServiceClient struct { + cc grpc.ClientConnInterface +} + +func NewTunnelServiceClient(cc grpc.ClientConnInterface) TunnelServiceClient { + return &tunnelServiceClient{cc} +} + +func (c *tunnelServiceClient) Tunnel(ctx context.Context, opts ...grpc.CallOption) (TunnelService_TunnelClient, error) { + stream, err := c.cc.NewStream(ctx, &TunnelService_ServiceDesc.Streams[0], "/proto.TunnelService/Tunnel", opts...) + if err != nil { + return nil, err + } + x := &tunnelServiceTunnelClient{stream} + return x, nil +} + +type TunnelService_TunnelClient interface { + Send(*Chunk) error + Recv() (*Chunk, error) + grpc.ClientStream +} + +type tunnelServiceTunnelClient struct { + grpc.ClientStream +} + +func (x *tunnelServiceTunnelClient) Send(m *Chunk) error { + return x.ClientStream.SendMsg(m) +} + +func (x *tunnelServiceTunnelClient) Recv() (*Chunk, error) { + m := new(Chunk) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// TunnelServiceServer is the server API for TunnelService service. +// All implementations must embed UnimplementedTunnelServiceServer +// for forward compatibility +type TunnelServiceServer interface { + Tunnel(TunnelService_TunnelServer) error + mustEmbedUnimplementedTunnelServiceServer() +} + +// UnimplementedTunnelServiceServer must be embedded to have forward compatible implementations. +type UnimplementedTunnelServiceServer struct { +} + +func (UnimplementedTunnelServiceServer) Tunnel(TunnelService_TunnelServer) error { + return status.Errorf(codes.Unimplemented, "method Tunnel not implemented") +} +func (UnimplementedTunnelServiceServer) mustEmbedUnimplementedTunnelServiceServer() {} + +// UnsafeTunnelServiceServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to TunnelServiceServer will +// result in compilation errors. +type UnsafeTunnelServiceServer interface { + mustEmbedUnimplementedTunnelServiceServer() +} + +func RegisterTunnelServiceServer(s grpc.ServiceRegistrar, srv TunnelServiceServer) { + s.RegisterService(&TunnelService_ServiceDesc, srv) +} + +func _TunnelService_Tunnel_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(TunnelServiceServer).Tunnel(&tunnelServiceTunnelServer{stream}) +} + +type TunnelService_TunnelServer interface { + Send(*Chunk) error + Recv() (*Chunk, error) + grpc.ServerStream +} + +type tunnelServiceTunnelServer struct { + grpc.ServerStream +} + +func (x *tunnelServiceTunnelServer) Send(m *Chunk) error { + return x.ServerStream.SendMsg(m) +} + +func (x *tunnelServiceTunnelServer) Recv() (*Chunk, error) { + m := new(Chunk) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// TunnelService_ServiceDesc is the grpc.ServiceDesc for TunnelService service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var TunnelService_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "proto.TunnelService", + HandlerType: (*TunnelServiceServer)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "Tunnel", + Handler: _TunnelService_Tunnel_Handler, + ServerStreams: true, + ClientStreams: true, + }, + }, + Metadata: "tunnel.proto", +} diff --git a/tools/tcp_grpc_proxy/proxy/tcp2grpc.go b/tools/tcp_grpc_proxy/proxy/tcp2grpc.go new file mode 100644 index 000000000..63b5586b8 --- /dev/null +++ b/tools/tcp_grpc_proxy/proxy/tcp2grpc.go @@ -0,0 +1,104 @@ +package proxy + +import ( + "context" + "io" + "net" + "tcp_grpc_proxy/proxy/proto" + + "github.com/sirupsen/logrus" + "google.golang.org/grpc" +) + +// TCP2GrpcServer to proxy TCP traffic to gRPC +type TCP2GrpcServer struct { + tcpServerAddress string + targetGrpcAddress string +} + +// NewTCP2GrpcServer constructs a TCP2GrpcServer +func NewTCP2GrpcServer(tcpServerAddress, targetGrpcAddress string) *TCP2GrpcServer { + return &TCP2GrpcServer{ + tcpServerAddress: tcpServerAddress, + targetGrpcAddress: targetGrpcAddress, + } +} + +func handleTCPConn(tcpConn net.Conn, targetGrpcAddress string) { + logrus.Infoln("Handle tcp connection, target to:", targetGrpcAddress) + defer tcpConn.Close() + + grpcConn, err := grpc.Dial(targetGrpcAddress, grpc.WithInsecure()) + if err != nil { + logrus.Errorf("Error during connect to grpc %s: %v", targetGrpcAddress, err) + return + } + defer grpcConn.Close() + + grpcClient := proto.NewTunnelServiceClient(grpcConn) + stream, err := grpcClient.Tunnel(context.Background()) + if err != nil { + logrus.Errorf("Error of tunnel service: %v", err) + return + } + + // Gets data from remote gRPC server and proxy to TCP client + go func() { + for { + chunk, err := stream.Recv() + if err != nil { + logrus.Errorf("Recv from grpc target %s terminated: %v", targetGrpcAddress, err) + return + } + logrus.Infof("Sending %d bytes to TCP client", len(chunk.Data)) + tcpConn.Write(chunk.Data) + } + }() + + // Gets data from TCP client and proxy to remote gRPC server + func() { + for { + tcpData := make([]byte, 64*1024) + bytesRead, err := tcpConn.Read(tcpData) + + if err == io.EOF { + logrus.Infoln("Connection finished") + return + } + if err != nil { + logrus.Errorf("Read from tcp error: %v", err) + return + } + logrus.Infof("Sending %d bytes to gRPC server", bytesRead) + if err := stream.Send(&proto.Chunk{Data: tcpData[0:bytesRead]}); err != nil { + logrus.Errorf("Failed to send gRPC data: %v", err) + return + } + } + }() + + // If tcp connection gets closed, then we close the gRPC connection. + stream.CloseSend() + return +} + +// Run Starts the server +func (s *TCP2GrpcServer) Run() { + listener, err := net.Listen("tcp", s.tcpServerAddress) + if err != nil { + logrus.Fatalln("Listen TCP error: ", err) + } + defer listener.Close() + logrus.Infoln("Run TCPServer at ", s.tcpServerAddress) + + for { + conn, err := listener.Accept() + if err != nil { + logrus.Errorln("TCP listener error:", err) + continue + } + + logrus.Infoln("Got tcp connection") + go handleTCPConn(conn, s.targetGrpcAddress) + } +}