diff --git a/rpcclient/infrastructure.go b/rpcclient/infrastructure.go index 4fe1d894df..4a7f680bed 100644 --- a/rpcclient/infrastructure.go +++ b/rpcclient/infrastructure.go @@ -74,6 +74,9 @@ var ( // client having already connected to the RPC server. ErrClientAlreadyConnected = errors.New("websocket client has already " + "connected") + + // ErrEmptyBatch is an error to describe that there is nothing to send. + ErrEmptyBatch = errors.New("batch is empty") ) const ( @@ -151,6 +154,7 @@ type Client struct { // whether or not to batch requests, false unless changed by Batch() batch bool + batchLock sync.Mutex batchList *list.List // retryCount holds the number of times the client has tried to @@ -214,7 +218,10 @@ func (c *Client) addRequest(jReq *jsonRequest) error { element := c.requestList.PushBack(jReq) c.requestMap[jReq.id] = element } else { + c.batchLock.Lock() element := c.batchList.PushBack(jReq) + c.batchLock.Unlock() + c.requestMap[jReq.id] = element } return nil @@ -238,7 +245,9 @@ func (c *Client) removeRequest(id uint64) *jsonRequest { var request *jsonRequest if c.batch { + c.batchLock.Lock() request = c.batchList.Remove(element).(*jsonRequest) + c.batchLock.Unlock() } else { request = c.requestList.Remove(element).(*jsonRequest) } @@ -1656,7 +1665,15 @@ func (c *Client) BackendVersion() (BackendVersion, error) { return c.backendVersion, nil } -func (c *Client) sendAsync() FutureGetBulkResult { +func (c *Client) sendAsync() (FutureGetBulkResult, error) { + c.batchLock.Lock() + defer c.batchLock.Unlock() + + // if batchlist is empty, there's nothing to send + if c.batchList.Len() == 0 { + return nil, ErrEmptyBatch + } + // convert the array of marshalled json requests to a single request we can send responseChan := make(chan *Response, 1) marshalledRequest := []byte("[") @@ -1678,25 +1695,28 @@ func (c *Client) sendAsync() FutureGetBulkResult { responseChan: responseChan, } c.sendPostRequest(&request) - return responseChan + return responseChan, nil } // Marshall's bulk requests and sends to the server // creates a response channel to receive the response func (c *Client) Send() error { - // if batchlist is empty, there's nothing to send - if c.batchList.Len() == 0 { - return nil + future, err := c.sendAsync() + if err != nil { + return err } - batchResp, err := c.sendAsync().Receive() + batchResp, err := future.Receive() if err != nil { // Clear batchlist in case of an error. // // TODO(yy): need to double check to make sure there's no // concurrent access to this batch list, otherwise we may miss // some batched requests. + + c.batchLock.Lock() c.batchList = list.New() + c.batchLock.Unlock() return err }