Skip to content

Commit

Permalink
General improvements:
Browse files Browse the repository at this point in the history
Added sharded cache to reduce lock contention
Optimized buffer usage with pre-sized pools
Improved HTTP client with connection pooling and timeouts
Reduced allocations in JSON handling
  • Loading branch information
lukaszraczylo committed Jan 8, 2025
1 parent 63e449e commit 28eec46
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 63 deletions.
91 changes: 59 additions & 32 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package libpack_cache
import (
"bytes"
"compress/gzip"
"hash/fnv"
"io"
"sync"
"time"
Expand All @@ -13,12 +14,25 @@ type CacheEntry struct {
Value []byte
}

const shardCount = 256 // Must be power of 2

type shard struct {
entries map[string]CacheEntry
sync.RWMutex
}

type Cache struct {
compressPool sync.Pool
decompressPool sync.Pool
entries sync.Map
globalTTL time.Duration
sync.RWMutex
shards [shardCount]*shard
globalTTL time.Duration
}

// getShard returns the appropriate shard for a given key
func (c *Cache) getShard(key string) *shard {
hash := fnv.New32a()
hash.Write([]byte(key))
return c.shards[hash.Sum32()%shardCount]
}

func New(globalTTL time.Duration) *Cache {
Expand All @@ -32,13 +46,19 @@ func New(globalTTL time.Duration) *Cache {
},
decompressPool: sync.Pool{
New: func() interface{} {
// Ensure that new is returning a new reader initialized with an empty byte buffer
r, _ := gzip.NewReader(bytes.NewReader([]byte{}))
return r
},
},
}

// Initialize shards
for i := 0; i < shardCount; i++ {
cache.shards[i] = &shard{
entries: make(map[string]CacheEntry),
}
}

go cache.cleanupRoutine(globalTTL)
return cache
}
Expand All @@ -52,33 +72,43 @@ func (c *Cache) cleanupRoutine(globalTTL time.Duration) {
}
}
func (c *Cache) Set(key string, value []byte, ttl time.Duration) {
c.Lock() // use the lock
defer c.Unlock()

expiresAt := time.Now().Add(ttl)
shard := c.getShard(key)
shard.Lock()
defer shard.Unlock()

compressedValue, err := c.compress(value)
if err != nil {
return
}

entry := CacheEntry{
shard.entries[key] = CacheEntry{
Value: compressedValue,
ExpiresAt: expiresAt,
ExpiresAt: time.Now().Add(ttl),
}
c.entries.Store(key, entry)
}

func (c *Cache) Get(key string) ([]byte, bool) {
c.RLock() // use the read lock
defer c.RUnlock()
shard := c.getShard(key)
shard.RLock()
entry, ok := shard.entries[key]
if !ok {
shard.RUnlock()
return nil, false
}

entry, ok := c.entries.Load(key)
if !ok || entry.(CacheEntry).ExpiresAt.Before(time.Now()) {
if entry.ExpiresAt.Before(time.Now()) {
shard.RUnlock()
// Clean up expired entry in background
go func() {
shard.Lock()
delete(shard.entries, key)
shard.Unlock()
}()
return nil, false
}
compressedValue := entry.(CacheEntry).Value
value, err := c.decompress(compressedValue)
shard.RUnlock()

value, err := c.decompress(entry.Value)
if err != nil {
return nil, false
}
Expand All @@ -87,26 +117,23 @@ func (c *Cache) Get(key string) ([]byte, bool) {
}

func (c *Cache) Delete(key string) {
c.Lock()
defer c.Unlock()

_, ok := c.entries.Load(key)
if !ok {
return
}

c.entries.Delete(key)
shard := c.getShard(key)
shard.Lock()
delete(shard.entries, key)
shard.Unlock()
}

func (c *Cache) CleanExpiredEntries() {
now := time.Now()
c.entries.Range(func(key, value interface{}) bool {
entry := value.(CacheEntry)
if entry.ExpiresAt.Before(now) {
c.entries.Delete(key)
for _, shard := range c.shards {
shard.Lock()
for key, entry := range shard.entries {
if entry.ExpiresAt.Before(now) {
delete(shard.entries, key)
}
}
return true
})
shard.Unlock()
}
}

func (c *Cache) compress(data []byte) ([]byte, error) {
Expand Down
42 changes: 31 additions & 11 deletions execute_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,29 @@ import (
"fmt"
"io"
"net/http"
"sync"
"time"

"github.com/avast/retry-go/v4"
"github.com/goccy/go-json"
libpack_logger "github.com/lukaszraczylo/go-simple-graphql/logging"
)

var bufferPool = sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
},
}
var (
// Shared HTTP transport with optimized settings
defaultTransport = &http.Transport{
MaxIdleConns: 100,
MaxIdleConnsPerHost: 100,
IdleConnTimeout: 90 * time.Second,
DisableCompression: false, // Enable compression for responses
ForceAttemptHTTP2: true,
}

// Shared HTTP client with timeouts
defaultClient = &http.Client{
Transport: defaultTransport,
Timeout: 30 * time.Second,
}
)

func (qe *QueryExecutor) executeQuery() ([]byte, error) {
// Reuse buffer from pool to avoid allocations
Expand Down Expand Up @@ -62,7 +72,12 @@ func (qe *QueryExecutor) executeQuery() ([]byte, error) {
httpRequest.Body = io.NopCloser(bytes.NewReader(buf.Bytes()))
httpRequest.ContentLength = int64(buf.Len())

httpResponse, err := qe.client.Do(httpRequest)
// Use default client if custom client is not set
client := qe.client
if client == nil {
client = defaultClient
}
httpResponse, err := client.Do(httpRequest)
if err != nil {
return err
}
Expand All @@ -88,14 +103,18 @@ func (qe *QueryExecutor) executeQuery() ([]byte, error) {
reader = httpResponse.Body
}

// Read all response body
body, err := io.ReadAll(reader)
// Use buffer pool for reading response
respBuf := bufferPool.Get().(*bytes.Buffer)
respBuf.Reset()
defer bufferPool.Put(respBuf)

_, err = io.Copy(respBuf, reader)
if err != nil {
return fmt.Errorf("error reading HTTP response: %w", err)
}

// Unmarshal response
err = json.Unmarshal(body, &queryResult)
// Unmarshal response directly from buffer
err = json.Unmarshal(respBuf.Bytes(), &queryResult)
if err != nil {
return fmt.Errorf("error unmarshalling HTTP response: %w", err)
}
Expand All @@ -111,6 +130,7 @@ func (qe *QueryExecutor) executeQuery() ([]byte, error) {
retry.Attempts(uint(retriesMax)),
retry.DelayType(retry.BackOffDelay),
retry.Delay(time.Duration(qe.retries_delay)),
retry.MaxDelay(10 * time.Second),
retry.LastErrorOnly(true),
)
if err != nil {
Expand Down
24 changes: 24 additions & 0 deletions pools.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package gql

import (
"bytes"
"sync"
)

var (
// Pre-sized buffer pool for better performance
bufferPool = sync.Pool{
New: func() interface{} {
// Pre-allocate buffer with 4KB capacity
b := bytes.NewBuffer(make([]byte, 0, 4096))
return b
},
}

// Pre-allocate error maps to reduce allocations
errPairsPool = sync.Pool{
New: func() interface{} {
return make(map[string]interface{}, 1)
},
}
)
57 changes: 37 additions & 20 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,56 +11,73 @@ import (
)

func (b *BaseClient) convertToJSON(v any) []byte {
// Reuse buffer to reduce allocations
buf := bufferPool.Get().(*bytes.Buffer)
defer bufferPool.Put(buf)
buf.Reset()

// Use json.Marshal to avoid adding extra newline
jsonData, err := json.Marshal(v)
if err != nil {
// Use json.NewEncoder for better performance with buffers
enc := json.NewEncoder(buf)
enc.SetEscapeHTML(false) // Reduce unnecessary escaping

if err := enc.Encode(v); err != nil {
errPairs := errPairsPool.Get().(map[string]interface{})
errPairs["error"] = err.Error()
b.Logger.Error(&libpack_logger.LogMessage{
Message: "Can't convert to JSON",
Pairs: map[string]interface{}{"error": err.Error()},
Pairs: errPairs,
})
errPairsPool.Put(errPairs)
return nil
}

// Copy the bytes to the buffer
buf.Write(jsonData)
// Get the buffer bytes directly, trimming the trailing newline
bytes := buf.Bytes()
if len(bytes) > 0 && bytes[len(bytes)-1] == '\n' {
bytes = bytes[:len(bytes)-1]
}

// Copy the bytes to a new slice before returning
result := make([]byte, buf.Len())
copy(result, buf.Bytes())
// Make a copy of the bytes since the buffer will be reused
result := make([]byte, len(bytes))
copy(result, bytes)
return result
}

func (b *BaseClient) compileQuery(queryPartials ...any) *Query {
var query string
var variables map[string]interface{}
for _, partial := range queryPartials {
switch val := partial.(type) {
case string:
query = val
case map[string]interface{}:
variables = val

// Pre-allocate the query with an estimated size
if len(queryPartials) > 0 {
if str, ok := queryPartials[0].(string); ok {
query = str
}
}

// Only allocate variables map if we have more than one partial
if len(queryPartials) > 1 {
if vars, ok := queryPartials[1].(map[string]interface{}); ok {
variables = vars
}
}

if query == "" {
errPairs := errPairsPool.Get().(map[string]interface{})
errPairs["error"] = "query is empty"
b.Logger.Error(&libpack_logger.LogMessage{
Message: "Can't compile query",
Pairs: map[string]interface{}{"error": "query is empty"},
Pairs: errPairs,
})
errPairsPool.Put(errPairs)
return nil
}

jsonQuery := b.convertToJSON(&Query{Query: query, Variables: variables})
return &Query{
// Construct query object once
q := &Query{
Query: query,
Variables: variables,
JsonQuery: jsonQuery,
}
q.JsonQuery = b.convertToJSON(q)
return q
}

func (b *BaseClient) Query(query string, variables map[string]interface{}, headers map[string]interface{}) (any, error) {
Expand Down

0 comments on commit 28eec46

Please sign in to comment.