Skip to content

Commit

Permalink
Refactor code and remove extraneous methods
Browse files Browse the repository at this point in the history
The code in the transport.go file has been significantly streamlined by removing hundreds of lines of commented-out methods and functions. Furthermore, the respective file has been moved to the grpc directory. These changes improve the readability and maintenance of the codebase.
  • Loading branch information
liornabat committed Jan 30, 2024
1 parent 4d90aff commit 04b383d
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 1,053 deletions.
File renamed without changes.
File renamed without changes.
32 changes: 11 additions & 21 deletions v2/pubsub/grpc/tls.go → v2/grpc/tls.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ package grpc
import (
"crypto/tls"
"crypto/x509"
"encoding/pem"
"fmt"
"github.com/kubemq-io/kubemq-go/v2/config"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"os"
)

func getTLSConnectionOptions(cfg *config.TlsConfig) ([]grpc.DialOption, error) {
Expand All @@ -22,33 +22,23 @@ func getTLSConnectionOptions(cfg *config.TlsConfig) ([]grpc.DialOption, error) {
options = append(options, grpc.WithTransportCredentials(insecure.NewCredentials()))
return options, nil
}
if cfg.SkipVerifyInsecure {
options = append(options)
}
if cfg.Cert == "" && cfg.Key == "" {

if cfg.CertFile == "" && cfg.KeyFile == "" {
return options, nil
}
certBlock, _ := pem.Decode([]byte(cfg.Cert))
if certBlock == nil {
return nil, fmt.Errorf("failed to parse tls certificate PEM")
}
keyBlock, _ := pem.Decode([]byte(cfg.Key))
if keyBlock == nil {
return nil, fmt.Errorf("failed to parse tls key PEM")
}
clientCert, err := tls.X509KeyPair(pem.EncodeToMemory(certBlock), pem.EncodeToMemory(keyBlock))
clientCert, err := tls.LoadX509KeyPair(cfg.CertFile, cfg.KeyFile)
if err != nil {
return nil, fmt.Errorf("failed to load client cert and key: %w", err)
return nil, fmt.Errorf("failed to load client cert %s and key %s, %w", cfg.CertFile, cfg.KeyFile, err)
}
var certPool *x509.CertPool
if cfg.Ca != "" {
caBlock, _ := pem.Decode([]byte(cfg.Ca))
if caBlock == nil {
return nil, fmt.Errorf("failed to parse tls ca PEM")
if cfg.CaFile != "" {
caCert, err := os.ReadFile(cfg.CaFile)
if err != nil {
return nil, fmt.Errorf("failed to read ca cert file %s, %w", cfg.CaFile, err)
}
certPool = x509.NewCertPool()
if ok := certPool.AppendCertsFromPEM(caBlock.Bytes); !ok {
return nil, fmt.Errorf("failed to append ca certs")
if ok := certPool.AppendCertsFromPEM(caCert); !ok {
return nil, fmt.Errorf("failed to append ca cert %s to cert pool", cfg.CaFile)
}
}
tlsConfig := &tls.Config{
Expand Down
105 changes: 105 additions & 0 deletions v2/grpc/transport.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package grpc

import (
"context"
"fmt"
"github.com/kubemq-io/kubemq-go/v2/config"
pb "github.com/kubemq-io/protobuf/go"
"go.uber.org/atomic"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)

type Transport struct {
opts *config.Connection
conn *grpc.ClientConn
client pb.KubemqClient
isClosed *atomic.Bool
}

func NewTransport(ctx context.Context, cfg *config.Connection) (*Transport, error) {
t := &Transport{
opts: cfg,
conn: nil,
client: nil,
isClosed: atomic.NewBool(false),
}

connOptions := []grpc.DialOption{
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(cfg.MaxReceiveSize), grpc.MaxCallSendMsgSize(cfg.MaxSendSize)),
grpc.WithUnaryInterceptor(t.setUnaryInterceptor()),
grpc.WithStreamInterceptor(t.setStreamInterceptor()),
}
tlsOptions, err := getTLSConnectionOptions(cfg.Tls)
if err != nil {
return nil, err
}
if tlsOptions != nil {
connOptions = append(connOptions, tlsOptions...)
}
if keepAliveOptions := getKeepAliveConnectionOptions(cfg.KeepAlive); keepAliveOptions != nil {
connOptions = append(connOptions, keepAliveOptions...)
}

t.conn, err = grpc.DialContext(ctx, cfg.Address, connOptions...)
if err != nil {
return nil, fmt.Errorf("error connecting to kubemq server, %w", err)
}
go func() {
<-ctx.Done()
if t.conn != nil {
_ = t.conn.Close()
}
}()
t.client = pb.NewKubemqClient(t.conn)
_, err = t.Ping(ctx)
if err != nil {
return nil, err
}

return t, nil
}

func (t *Transport) setUnaryInterceptor() grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
if t.opts.AuthToken != "" {
ctx = metadata.AppendToOutgoingContext(ctx, "authorization", t.opts.AuthToken)
}
return invoker(ctx, method, req, reply, cc, opts...)
}
}

func (t *Transport) setStreamInterceptor() grpc.StreamClientInterceptor {
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
if t.opts.AuthToken != "" {
ctx = metadata.AppendToOutgoingContext(ctx, "authorization", t.opts.AuthToken)
}
return streamer(ctx, desc, cc, method, opts...)
}
}

func (t *Transport) Ping(ctx context.Context) (*ServerInfo, error) {
res, err := t.client.Ping(ctx, &pb.Empty{})
if err != nil {
return nil, fmt.Errorf("error connecting to kubemq server, %w", err)
}
si := &ServerInfo{
Host: res.Host,
Version: res.Version,
ServerStartTime: res.ServerStartTime,
ServerUpTimeSeconds: res.ServerUpTimeSeconds,
}
return si, nil
}

func (t *Transport) KubeMQClient() pb.KubemqClient {
return t.client
}
func (t *Transport) Close() error {
err := t.conn.Close()
if err != nil {
return err
}
t.isClosed.Store(true)
return nil
}
Loading

0 comments on commit 04b383d

Please sign in to comment.