Skip to content

Commit

Permalink
add connection status notification
Browse files Browse the repository at this point in the history
  • Loading branch information
kubemq committed Jun 28, 2021
1 parent 075cda8 commit ccf2b06
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 40 deletions.
17 changes: 9 additions & 8 deletions queues_stream/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,17 @@ func NewQueuesStreamClient(ctx context.Context, op ...Option) (*QueuesStreamClie
if err != nil {
return nil, err
}
return &QueuesStreamClient{
clientCtx: ctx,
client: client,
upstream: newUpstream(ctx, client.KubemqClient),
downstream: newDownstream(ctx, client.KubemqClient),
}, nil
c := &QueuesStreamClient{
clientCtx: ctx,
client: client,
}
c.upstream = newUpstream(ctx, c)
c.downstream = newDownstream(ctx, c)
return c, nil
}
func (q *QueuesStreamClient) Send(ctx context.Context, messages ...*QueueMessage) (*SendResult, error) {
if !q.upstream.isReady() {
return nil, fmt.Errorf("kubemq client connection lost, can't send messages ")
return nil, fmt.Errorf("kubemq grpc client connection lost, can't send messages ")
}
if len(messages) == 0 {
return nil, fmt.Errorf("no messages to send")
Expand All @@ -57,7 +58,7 @@ func (q *QueuesStreamClient) Send(ctx context.Context, messages ...*QueueMessage

func (q *QueuesStreamClient) Poll(ctx context.Context, request *PollRequest) (*PollResponse, error) {
if !q.downstream.isReady() {
return nil, fmt.Errorf("kubemq client connection lost, can't poll messages")
return nil, fmt.Errorf("kubemq grpc client connection lost, can't poll messages")
}
pollReq, err := q.downstream.poll(ctx, request, q.client.GlobalClientId())
return pollReq, err
Expand Down
24 changes: 19 additions & 5 deletions queues_stream/downstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ type downstream struct {
responseCh chan *pb.QueuesDownstreamResponse
errCh chan error
doneCh chan bool
client pb.KubemqClient
grpcClient pb.KubemqClient
streamClient *QueuesStreamClient
isClosed bool
connectionState *atomic.Bool
}

func newDownstream(ctx context.Context, client pb.KubemqClient) *downstream {
func newDownstream(ctx context.Context, streamClient *QueuesStreamClient) *downstream {

d := &downstream{
Mutex: sync.Mutex{},
Expand All @@ -37,7 +38,8 @@ func newDownstream(ctx context.Context, client pb.KubemqClient) *downstream {
responseCh: make(chan *pb.QueuesDownstreamResponse, 10),
errCh: make(chan error, 10),
doneCh: make(chan bool, 1),
client: client,
grpcClient: streamClient.client.KubemqClient,
streamClient: streamClient,
isClosed: false,
connectionState: atomic.NewBool(false),
}
Expand All @@ -61,6 +63,13 @@ func (d *downstream) getIsClose() bool {
defer d.Unlock()
return d.isClosed
}
func (d *downstream) sendOnConnectionState(msg string) {
if d.streamClient.client.opts.connectionNotificationFunc != nil {
go func() {
d.streamClient.client.opts.connectionNotificationFunc(msg)
}()
}
}
func (d *downstream) createPendingTransaction(request *pb.QueuesDownstreamRequest) *responseHandler {
d.Lock()
defer d.Unlock()
Expand Down Expand Up @@ -109,13 +118,16 @@ func (d *downstream) connectStream(ctx context.Context) {
defer func() {
d.doneCh <- true
d.connectionState.Store(false)
d.sendOnConnectionState(fmt.Sprintf("grpc queue client downstream disconnected"))
}()
stream, err := d.client.QueuesDownstream(ctx)
stream, err := d.grpcClient.QueuesDownstream(ctx)
if err != nil {
d.errCh <- err
d.sendOnConnectionState(fmt.Sprintf("grpc queue client downstream connection error, %s", err.Error()))
return
}
d.connectionState.Store(true)
d.sendOnConnectionState(fmt.Sprintf("grpc queue client downstream connected"))
go func() {
for {
res, err := stream.Recv()
Expand All @@ -124,6 +136,7 @@ func (d *downstream) connectStream(ctx context.Context) {
return
}
d.errCh <- err
d.sendOnConnectionState(fmt.Sprintf("grpc queue client downstream receive error, %s", err.Error()))
return
}
select {
Expand All @@ -145,6 +158,7 @@ func (d *downstream) connectStream(ctx context.Context) {
return
}
d.errCh <- err
d.sendOnConnectionState(fmt.Sprintf("grpc queue client downstream send error, %s", err.Error()))
return
}
case <-stream.Context().Done():
Expand Down Expand Up @@ -265,7 +279,7 @@ func (d *downstream) poll(ctx context.Context, request *PollRequest, clientId st
}
return pollResponse, nil
case connectionErr := <-respHandler.errCh:
return nil, fmt.Errorf("client connection error, %s", connectionErr.Error())
return nil, fmt.Errorf("grpcClient connection error, %s", connectionErr.Error())
case <-waitResponseCtx.Done():
d.deletePendingTransaction(pbReq.RequestID)
return nil, fmt.Errorf("timout waiting response for poll messages request")
Expand Down
2 changes: 1 addition & 1 deletion queues_stream/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (qm *QueueMessage) SetId(id string) *QueueMessage {

}

// SetClientId - set queue message ClientId - mandatory if default client was not set
// SetClientId - set queue message ClientId - mandatory if default grpcClient was not set
func (qm *QueueMessage) SetClientId(clientId string) *QueueMessage {
qm.ClientID = clientId
return qm
Expand Down
50 changes: 29 additions & 21 deletions queues_stream/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,24 @@ type Option interface {
}

type Options struct {
host string
port int
isSecured bool
certFile string
certData string
serverOverrideDomain string
authToken string
clientId string
receiveBufferSize int
defaultChannel string
defaultCacheTTL time.Duration
restUri string
webSocketUri string
autoReconnect bool
reconnectInterval time.Duration
maxReconnect int
checkConnection bool
host string
port int
isSecured bool
certFile string
certData string
serverOverrideDomain string
authToken string
clientId string
receiveBufferSize int
defaultChannel string
defaultCacheTTL time.Duration
restUri string
webSocketUri string
autoReconnect bool
reconnectInterval time.Duration
maxReconnect int
checkConnection bool
connectionNotificationFunc func(msg string)
}

type funcOptions struct {
Expand All @@ -54,7 +55,14 @@ func WithAddress(host string, port int) Option {
})
}

// WithCredentials - set secured TLS credentials from the input certificate file for client.
// WithConnectionNotificationFunc - set on connection activity messages
func WithConnectionNotificationFunc(fn func(msg string)) Option {
return newFuncOption(func(o *Options) {
o.connectionNotificationFunc = fn
})
}

// WithCredentials - set secured TLS credentials from the input certificate file for grpcClient.
// serverNameOverride is for testing only. If set to a non empty string,
// it will override the virtual host name of authority (e.g. :authority header field) in requests.
func WithCredentials(certFile, serverOverrideDomain string) Option {
Expand All @@ -65,7 +73,7 @@ func WithCredentials(certFile, serverOverrideDomain string) Option {
})
}

// WithCertificate - set secured TLS credentials from the input certificate data for client.
// WithCertificate - set secured TLS credentials from the input certificate data for grpcClient.
// serverNameOverride is for testing only. If set to a non empty string,
// it will override the virtual host name of authority (e.g. :authority header field) in requests.
func WithCertificate(certData, serverOverrideDomain string) Option {
Expand All @@ -83,7 +91,7 @@ func WithAuthToken(token string) Option {
})
}

// WithClientId - set client id to be used in all functions call with this client - mandatory
// WithClientId - set grpcClient id to be used in all functions call with this grpcClient - mandatory
func WithClientId(id string) Option {
return newFuncOption(func(o *Options) {
o.clientId = id
Expand Down Expand Up @@ -132,7 +140,7 @@ func WithMaxReconnects(value int) Option {
})
}

// WithCheckConnection - set server connectivity on client create
// WithCheckConnection - set server connectivity on grpcClient create
func WithCheckConnection(value bool) Option {
return newFuncOption(func(o *Options) {
o.checkConnection = value
Expand Down
24 changes: 19 additions & 5 deletions queues_stream/upstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package queues_stream

import (
"context"
"fmt"
pb "github.com/kubemq-io/protobuf/go"
"go.uber.org/atomic"
"io"
Expand All @@ -18,12 +19,13 @@ type upstream struct {
responseCh chan *pb.QueuesUpstreamResponse
errCh chan error
doneCh chan bool
client pb.KubemqClient
grpcClient pb.KubemqClient
streamClient *QueuesStreamClient
isClosed bool
connectionState *atomic.Bool
}

func newUpstream(ctx context.Context, client pb.KubemqClient) *upstream {
func newUpstream(ctx context.Context, streamClient *QueuesStreamClient) *upstream {

u := &upstream{
Mutex: sync.Mutex{},
Expand All @@ -32,15 +34,22 @@ func newUpstream(ctx context.Context, client pb.KubemqClient) *upstream {
requestCh: make(chan *pb.QueuesUpstreamRequest, 10),
responseCh: make(chan *pb.QueuesUpstreamResponse, 10),
doneCh: make(chan bool, 1),
client: client,
grpcClient: streamClient.client.KubemqClient,
streamClient: streamClient,
connectionState: atomic.NewBool(false),
}
u.upstreamCtx, u.upstreamCancel = context.WithCancel(ctx)
go u.run()
time.Sleep(time.Second)
return u
}

func (u *upstream) sendOnConnectionState(msg string) {
if u.streamClient.client.opts.connectionNotificationFunc != nil {
go func() {
u.streamClient.client.opts.connectionNotificationFunc(msg)
}()
}
}
func (u *upstream) close() {
u.setIsClose(true)
u.upstreamCancel()
Expand Down Expand Up @@ -79,13 +88,16 @@ func (u *upstream) connectStream(ctx context.Context) {
defer func() {
u.doneCh <- true
u.connectionState.Store(false)
u.sendOnConnectionState(fmt.Sprintf("grpc queue client upstream disconnected"))
}()
stream, err := u.client.QueuesUpstream(ctx)
stream, err := u.grpcClient.QueuesUpstream(ctx)
if err != nil {
u.errCh <- err
u.sendOnConnectionState(fmt.Sprintf("grpc queue client upstream connection error, %s", err.Error()))
return
}
u.connectionState.Store(true)
u.sendOnConnectionState(fmt.Sprintf("grpc queue client upstream connected"))
go func() {
for {
res, err := stream.Recv()
Expand All @@ -94,6 +106,7 @@ func (u *upstream) connectStream(ctx context.Context) {
return
}
u.errCh <- err
u.sendOnConnectionState(fmt.Sprintf("grpc queue client upstream receive error, %s", err.Error()))
return
}
select {
Expand All @@ -115,6 +128,7 @@ func (u *upstream) connectStream(ctx context.Context) {
return
}
u.errCh <- err
u.sendOnConnectionState(fmt.Sprintf("grpc queue client updatream send error, %s", err.Error()))
return
}
case <-stream.Context().Done():
Expand Down

0 comments on commit ccf2b06

Please sign in to comment.