diff --git a/cache/cache.go b/cache/cache.go index b2025fb..afdf163 100644 --- a/cache/cache.go +++ b/cache/cache.go @@ -3,6 +3,7 @@ package libpack_cache import ( "bytes" "compress/gzip" + "hash/fnv" "io" "sync" "time" @@ -13,12 +14,25 @@ type CacheEntry struct { Value []byte } +const shardCount = 256 // Must be power of 2 + +type shard struct { + entries map[string]CacheEntry + sync.RWMutex +} + type Cache struct { compressPool sync.Pool decompressPool sync.Pool - entries sync.Map - globalTTL time.Duration - sync.RWMutex + shards [shardCount]*shard + globalTTL time.Duration +} + +// getShard returns the appropriate shard for a given key +func (c *Cache) getShard(key string) *shard { + hash := fnv.New32a() + hash.Write([]byte(key)) + return c.shards[hash.Sum32()%shardCount] } func New(globalTTL time.Duration) *Cache { @@ -32,13 +46,19 @@ func New(globalTTL time.Duration) *Cache { }, decompressPool: sync.Pool{ New: func() interface{} { - // Ensure that new is returning a new reader initialized with an empty byte buffer r, _ := gzip.NewReader(bytes.NewReader([]byte{})) return r }, }, } + // Initialize shards + for i := 0; i < shardCount; i++ { + cache.shards[i] = &shard{ + entries: make(map[string]CacheEntry), + } + } + go cache.cleanupRoutine(globalTTL) return cache } @@ -52,33 +72,43 @@ func (c *Cache) cleanupRoutine(globalTTL time.Duration) { } } func (c *Cache) Set(key string, value []byte, ttl time.Duration) { - c.Lock() // use the lock - defer c.Unlock() - - expiresAt := time.Now().Add(ttl) + shard := c.getShard(key) + shard.Lock() + defer shard.Unlock() compressedValue, err := c.compress(value) if err != nil { return } - entry := CacheEntry{ + shard.entries[key] = CacheEntry{ Value: compressedValue, - ExpiresAt: expiresAt, + ExpiresAt: time.Now().Add(ttl), } - c.entries.Store(key, entry) } func (c *Cache) Get(key string) ([]byte, bool) { - c.RLock() // use the read lock - defer c.RUnlock() + shard := c.getShard(key) + shard.RLock() + entry, ok := shard.entries[key] + if !ok { + shard.RUnlock() + return nil, false + } - entry, ok := c.entries.Load(key) - if !ok || entry.(CacheEntry).ExpiresAt.Before(time.Now()) { + if entry.ExpiresAt.Before(time.Now()) { + shard.RUnlock() + // Clean up expired entry in background + go func() { + shard.Lock() + delete(shard.entries, key) + shard.Unlock() + }() return nil, false } - compressedValue := entry.(CacheEntry).Value - value, err := c.decompress(compressedValue) + shard.RUnlock() + + value, err := c.decompress(entry.Value) if err != nil { return nil, false } @@ -87,26 +117,23 @@ func (c *Cache) Get(key string) ([]byte, bool) { } func (c *Cache) Delete(key string) { - c.Lock() - defer c.Unlock() - - _, ok := c.entries.Load(key) - if !ok { - return - } - - c.entries.Delete(key) + shard := c.getShard(key) + shard.Lock() + delete(shard.entries, key) + shard.Unlock() } func (c *Cache) CleanExpiredEntries() { now := time.Now() - c.entries.Range(func(key, value interface{}) bool { - entry := value.(CacheEntry) - if entry.ExpiresAt.Before(now) { - c.entries.Delete(key) + for _, shard := range c.shards { + shard.Lock() + for key, entry := range shard.entries { + if entry.ExpiresAt.Before(now) { + delete(shard.entries, key) + } } - return true - }) + shard.Unlock() + } } func (c *Cache) compress(data []byte) ([]byte, error) { diff --git a/execute_query.go b/execute_query.go index 5999b53..676c5ae 100644 --- a/execute_query.go +++ b/execute_query.go @@ -7,7 +7,6 @@ import ( "fmt" "io" "net/http" - "sync" "time" "github.com/avast/retry-go/v4" @@ -15,11 +14,22 @@ import ( libpack_logger "github.com/lukaszraczylo/go-simple-graphql/logging" ) -var bufferPool = sync.Pool{ - New: func() interface{} { - return new(bytes.Buffer) - }, -} +var ( + // Shared HTTP transport with optimized settings + defaultTransport = &http.Transport{ + MaxIdleConns: 100, + MaxIdleConnsPerHost: 100, + IdleConnTimeout: 90 * time.Second, + DisableCompression: false, // Enable compression for responses + ForceAttemptHTTP2: true, + } + + // Shared HTTP client with timeouts + defaultClient = &http.Client{ + Transport: defaultTransport, + Timeout: 30 * time.Second, + } +) func (qe *QueryExecutor) executeQuery() ([]byte, error) { // Reuse buffer from pool to avoid allocations @@ -62,7 +72,12 @@ func (qe *QueryExecutor) executeQuery() ([]byte, error) { httpRequest.Body = io.NopCloser(bytes.NewReader(buf.Bytes())) httpRequest.ContentLength = int64(buf.Len()) - httpResponse, err := qe.client.Do(httpRequest) + // Use default client if custom client is not set + client := qe.client + if client == nil { + client = defaultClient + } + httpResponse, err := client.Do(httpRequest) if err != nil { return err } @@ -88,14 +103,18 @@ func (qe *QueryExecutor) executeQuery() ([]byte, error) { reader = httpResponse.Body } - // Read all response body - body, err := io.ReadAll(reader) + // Use buffer pool for reading response + respBuf := bufferPool.Get().(*bytes.Buffer) + respBuf.Reset() + defer bufferPool.Put(respBuf) + + _, err = io.Copy(respBuf, reader) if err != nil { return fmt.Errorf("error reading HTTP response: %w", err) } - // Unmarshal response - err = json.Unmarshal(body, &queryResult) + // Unmarshal response directly from buffer + err = json.Unmarshal(respBuf.Bytes(), &queryResult) if err != nil { return fmt.Errorf("error unmarshalling HTTP response: %w", err) } @@ -111,6 +130,7 @@ func (qe *QueryExecutor) executeQuery() ([]byte, error) { retry.Attempts(uint(retriesMax)), retry.DelayType(retry.BackOffDelay), retry.Delay(time.Duration(qe.retries_delay)), + retry.MaxDelay(10 * time.Second), retry.LastErrorOnly(true), ) if err != nil { diff --git a/pools.go b/pools.go new file mode 100644 index 0000000..d9804d5 --- /dev/null +++ b/pools.go @@ -0,0 +1,24 @@ +package gql + +import ( + "bytes" + "sync" +) + +var ( + // Pre-sized buffer pool for better performance + bufferPool = sync.Pool{ + New: func() interface{} { + // Pre-allocate buffer with 4KB capacity + b := bytes.NewBuffer(make([]byte, 0, 4096)) + return b + }, + } + + // Pre-allocate error maps to reduce allocations + errPairsPool = sync.Pool{ + New: func() interface{} { + return make(map[string]interface{}, 1) + }, + } +) diff --git a/query.go b/query.go index a1de3ff..8d470cb 100644 --- a/query.go +++ b/query.go @@ -11,56 +11,73 @@ import ( ) func (b *BaseClient) convertToJSON(v any) []byte { - // Reuse buffer to reduce allocations buf := bufferPool.Get().(*bytes.Buffer) defer bufferPool.Put(buf) buf.Reset() - // Use json.Marshal to avoid adding extra newline - jsonData, err := json.Marshal(v) - if err != nil { + // Use json.NewEncoder for better performance with buffers + enc := json.NewEncoder(buf) + enc.SetEscapeHTML(false) // Reduce unnecessary escaping + + if err := enc.Encode(v); err != nil { + errPairs := errPairsPool.Get().(map[string]interface{}) + errPairs["error"] = err.Error() b.Logger.Error(&libpack_logger.LogMessage{ Message: "Can't convert to JSON", - Pairs: map[string]interface{}{"error": err.Error()}, + Pairs: errPairs, }) + errPairsPool.Put(errPairs) return nil } - // Copy the bytes to the buffer - buf.Write(jsonData) + // Get the buffer bytes directly, trimming the trailing newline + bytes := buf.Bytes() + if len(bytes) > 0 && bytes[len(bytes)-1] == '\n' { + bytes = bytes[:len(bytes)-1] + } - // Copy the bytes to a new slice before returning - result := make([]byte, buf.Len()) - copy(result, buf.Bytes()) + // Make a copy of the bytes since the buffer will be reused + result := make([]byte, len(bytes)) + copy(result, bytes) return result } func (b *BaseClient) compileQuery(queryPartials ...any) *Query { var query string var variables map[string]interface{} - for _, partial := range queryPartials { - switch val := partial.(type) { - case string: - query = val - case map[string]interface{}: - variables = val + + // Pre-allocate the query with an estimated size + if len(queryPartials) > 0 { + if str, ok := queryPartials[0].(string); ok { + query = str + } + } + + // Only allocate variables map if we have more than one partial + if len(queryPartials) > 1 { + if vars, ok := queryPartials[1].(map[string]interface{}); ok { + variables = vars } } if query == "" { + errPairs := errPairsPool.Get().(map[string]interface{}) + errPairs["error"] = "query is empty" b.Logger.Error(&libpack_logger.LogMessage{ Message: "Can't compile query", - Pairs: map[string]interface{}{"error": "query is empty"}, + Pairs: errPairs, }) + errPairsPool.Put(errPairs) return nil } - jsonQuery := b.convertToJSON(&Query{Query: query, Variables: variables}) - return &Query{ + // Construct query object once + q := &Query{ Query: query, Variables: variables, - JsonQuery: jsonQuery, } + q.JsonQuery = b.convertToJSON(q) + return q } func (b *BaseClient) Query(query string, variables map[string]interface{}, headers map[string]interface{}) (any, error) {