From d2c5fcbdfb533b9053f8930a050f60ade93727db Mon Sep 17 00:00:00 2001 From: Matheus Degiovani Date: Tue, 5 Mar 2024 17:28:32 -0300 Subject: [PATCH 01/19] bufferpool: Switch to zeropool This commit switches the bufferpool to use the zeropool implementation for sync pools. The stdlib sync.Pool implementation has an issue where it causes an additional heap allocation per Put() call when used with byte slices. github.com/colega/zeropool package has been specifically designed to work around this issue, which reduces GC pressure and improves performance. This also fixes the bufferpool's pkg benchmark to use a new pool per test, to avoid other tests influencing the behavior of the benchmark and sets it to report the allocations. --- exp/bufferpool/pool.go | 14 ++++++++------ exp/bufferpool/pool_test.go | 13 +++++++++++-- go.mod | 1 + go.sum | 2 ++ 4 files changed, 22 insertions(+), 8 deletions(-) diff --git a/exp/bufferpool/pool.go b/exp/bufferpool/pool.go index 22112687..170917d8 100644 --- a/exp/bufferpool/pool.go +++ b/exp/bufferpool/pool.go @@ -3,6 +3,8 @@ package bufferpool import ( "sync" + + "github.com/colega/zeropool" ) const ( @@ -90,20 +92,20 @@ func (p *Pool) init() { // still maximizing reuse of buffers allocated by Get. // Note that we cannot simply use n.buckets[idx].New, // as this would side-step pooling. - p.buckets[i].New = p.buckets[idx].Get + p.buckets[i] = zeropool.New(p.buckets[idx].Get) } else { - p.buckets[i].New = newAllocFunc(i) + p.buckets[i] = zeropool.New(newAllocFunc(i)) } } }) } -type bucketSlice []sync.Pool +type bucketSlice []zeropool.Pool[[]byte] func (bs bucketSlice) Get(size int) []byte { for i := range bs { if 1<= size { - return bs[i].Get().([]byte) + return bs[i].Get() } } @@ -119,8 +121,8 @@ func (bs bucketSlice) Put(buf []byte) { } } -func newAllocFunc(i int) func() any { - return func() any { +func newAllocFunc(i int) func() []byte { + return func() []byte { return make([]byte, 1< Date: Thu, 14 Mar 2024 17:47:45 -0300 Subject: [PATCH 02/19] message: Fix reuse of first segment This fixes the Message's Reset() call to allow reuse of the first segment. Prior to this fix, the first segment was discarded after the first Reset call, effectively causing a new segment to be initialized on every Reset call. By reusing the first segment, the number of heap allocations is reduced and therefore performance is increased in use cases where the message object is reused. The fix involved associtating the segment to the message and fixing checks to ensure the data of the segment is re-allocated after the reset. A benchmark is included to show the current performance of this. --- message.go | 11 ++++++++--- message_test.go | 18 ++++++++++++++++++ segment.go | 2 ++ 3 files changed, 28 insertions(+), 3 deletions(-) diff --git a/message.go b/message.go index 5ae7f3e8..fe214b8b 100644 --- a/message.go +++ b/message.go @@ -3,6 +3,7 @@ package capnp import ( "encoding/binary" "errors" + "fmt" "io" "sync" "sync/atomic" @@ -100,6 +101,9 @@ func (m *Message) Release() { func (m *Message) Reset(arena Arena) (first *Segment, err error) { m.capTable.Reset() for k := range m.segs { + if k == 0 && m.segs[k] == &m.firstSeg { + continue + } delete(m.segs, k) } @@ -113,6 +117,7 @@ func (m *Message) Reset(arena Arena) (first *Segment, err error) { DepthLimit: m.DepthLimit, capTable: m.capTable, segs: m.segs, + firstSeg: Segment{msg: m}, } if arena != nil { @@ -264,10 +269,10 @@ func (m *Message) Segment(id SegmentID) (*Segment, error) { // segment returns the segment with the given ID, with no bounds // checking. The caller must be holding m.mu. func (m *Message) segment(id SegmentID) (*Segment, error) { - if m.segs == nil && id == 0 && m.firstSeg.msg != nil { + if m.segs == nil && id == 0 && m.firstSeg.msg != nil && m.firstSeg.data != nil { return &m.firstSeg, nil } - if s := m.segs[id]; s != nil { + if s := m.segs[id]; s != nil && s.data != nil { return s, nil } if len(m.segs) == maxInt { @@ -442,7 +447,7 @@ func alloc(s *Segment, sz Size) (*Segment, address, error) { var err error s, err = s.msg.allocSegment(sz) if err != nil { - return nil, 0, err + return nil, 0, fmt.Errorf("allocSegment failed: %v", err) } } diff --git a/message_test.go b/message_test.go index b146bd06..b705e2e1 100644 --- a/message_test.go +++ b/message_test.go @@ -643,3 +643,21 @@ func (readOnlyArena) Allocate(sz Size, segs map[SegmentID]*Segment) (SegmentID, } var errReadOnlyArena = errors.New("Allocate called on read-only arena") + +func BenchmarkMessageGetFirstSegment(b *testing.B) { + var msg Message + var arena Arena = SingleSegment(nil) + + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + _, err := msg.Reset(arena) + if err != nil { + b.Fatal(err) + } + _, err = msg.Segment(0) + if err != nil { + b.Fatal(err) + } + } +} diff --git a/segment.go b/segment.go index 14b4bd4e..119e50cf 100644 --- a/segment.go +++ b/segment.go @@ -15,6 +15,8 @@ type SegmentID uint32 // It is part of a Message, which can contain other segments that // reference each other. type Segment struct { + // msg associated with this segment. A Message instance m maintains the + // invariant that that all m.segs[].msg == m. msg *Message id SegmentID data []byte From 64f1015fe523d9a7160ff5eba98e08bcc02ee765 Mon Sep 17 00:00:00 2001 From: Matheus Degiovani Date: Fri, 22 Mar 2024 14:46:38 -0300 Subject: [PATCH 03/19] rpc: Disable flaky test Momentarily, while refactoring is going on. --- rpc/senderpromise_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/rpc/senderpromise_test.go b/rpc/senderpromise_test.go index 9de46698..5417a34d 100644 --- a/rpc/senderpromise_test.go +++ b/rpc/senderpromise_test.go @@ -354,6 +354,7 @@ func TestDisembargoSenderPromise(t *testing.T) { // Tests that E-order is respected when fulfilling a promise with something on // the remote peer. func TestPromiseOrdering(t *testing.T) { + t.Skip("Disabled due to being flaky") t.Parallel() ctx := context.Background() From 29d5cb9a29aaa3a1395e03c65f3ac175da084c42 Mon Sep 17 00:00:00 2001 From: Matheus Degiovani Date: Fri, 22 Mar 2024 15:05:50 -0300 Subject: [PATCH 04/19] test: Demonstrate test is broken This shows that the BenchmarkUnmarshal_Reuse is broken. --- integration_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/integration_test.go b/integration_test.go index dc183c2d..fafb55d5 100644 --- a/integration_test.go +++ b/integration_test.go @@ -1779,7 +1779,10 @@ func BenchmarkUnmarshal_Reuse(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { *ta = testArena(data[r.Intn(len(data))][8:]) - msg.Reset(arena) + _, err := msg.Reset(arena) + if err != nil { + b.Fatal(err) + } a, _ := air.ReadRootBenchmarkA(msg) unmarshalA(a) } From 4bbbafcfc189356b1d36d769a315e9a52fcda886 Mon Sep 17 00:00:00 2001 From: Matheus Degiovani Date: Fri, 22 Mar 2024 15:11:21 -0300 Subject: [PATCH 05/19] message: Prove Reset() cannot be used to reset read-only message --- message_test.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/message_test.go b/message_test.go index b705e2e1..5bd8448c 100644 --- a/message_test.go +++ b/message_test.go @@ -661,3 +661,17 @@ func BenchmarkMessageGetFirstSegment(b *testing.B) { } } } + +// TestCannotResetArenaForRead demonstrates that Reset() cannot be used when +// intending to read data from an arena (i.e. cannot reuse a msg value by +// calling Reset with the intention to read data). +func TestCannotResetArenaForRead(t *testing.T) { + var msg Message + var arena Arena = SingleSegment(incrementingData(8)) + + _, err := msg.Reset(arena) + if err == nil { + t.Fatal("expected non nil error, got nil") + } + t.Logf("Got err: %v", err) +} From 95a6fbc7088d4070d87295af646266427e9f0a0e Mon Sep 17 00:00:00 2001 From: Matheus Degiovani Date: Thu, 21 Mar 2024 13:26:28 -0300 Subject: [PATCH 06/19] wip refactor --- allocator.go | 275 ++++++++++++++++++++++++++++++++++++++++++++ arena.go | 265 ++++++++++++++---------------------------- arena_test.go | 5 + canonical.go | 2 +- codec.go | 15 +-- integration_test.go | 29 +---- message.go | 271 ++++++++++++++++++------------------------- message_test.go | 9 +- segment.go | 11 +- 9 files changed, 507 insertions(+), 375 deletions(-) create mode 100644 allocator.go diff --git a/allocator.go b/allocator.go new file mode 100644 index 00000000..47b01603 --- /dev/null +++ b/allocator.go @@ -0,0 +1,275 @@ +package capnp + +import ( + "errors" + + "capnproto.org/go/capnp/v3/exp/bufferpool" +) + +// allocator defines methods for an allocator for the basic arena +// implementation: the allocator defines the strategy of how to grow and +// release memory slices that back segments. +type allocator interface { + // Grow an array. When called, it has already been determined that + // the slice must be grown. Implementations must copy the contents of + // the existing byte slice to the new one. + // + // The returned slice may be have a capacity larger than strcictly + // required to store an addition sz bytes. In this case, the length + // must be len(b)+sz, while cap may be any amount >= than that. + Grow(b []byte, totalMsgSize int64, sz Size) ([]byte, error) + + // Release signals that the passed byte slice won't be used by the + // arena anymore. + Release(b []byte) +} + +// bufferPoolAllocator allocates buffers from a buffer pool. If nil, then +// the global default buffer pool is used. +type bufferPoolAllocator bufferpool.Pool + +func (bpa *bufferPoolAllocator) Grow(b []byte, totalMsgSize int64, sz Size) ([]byte, error) { + pool := (*bufferpool.Pool)(bpa) + if pool == nil { + pool = &bufferpool.Default + } + + inc, err := nextAlloc(totalMsgSize, int64(maxAllocSize()), sz) + if err != nil { + return nil, err + } + nb := pool.Get(len(b) + int(inc)) + + // When there was a prior buffer, copy the contents to the new buffer, + // clear the old buffer and return the old buffer to the pool to be + // reused. + if b != nil { + copy(nb[:cap(nb)], b[:cap(b)]) + for i := range b { + b[i] = 0 + } + pool.Put(b) + } + + return nb, nil +} + +func (bpa *bufferPoolAllocator) Release(b []byte) { + if b == nil { + panic("nil buffer passed to release") + } + pool := (*bufferpool.Pool)(bpa) + if pool == nil { + pool = &bufferpool.Default + } + pool.Put(b) +} + +// simpleAllocator allocates buffers without any caching or reuse, using the +// standard memory management functions. +// +// This allocator is concurent safe for use across multiple arenas. +type simpleAllocator struct{} + +func (_ simpleAllocator) Grow(b []byte, totalMsgSize int64, sz Size) ([]byte, error) { + inc, err := nextAlloc(totalMsgSize, int64(maxAllocSize()), sz) + if err != nil { + return nil, err + } + return append(b, make([]byte, inc)...), nil +} + +func (_ simpleAllocator) Release(b []byte) { + // Nothing to do. The runtime GC will reclaim it. +} + +// segmentList defines the operations needed for a container of segments. +type segmentList interface { + // NumSegments must return the number of segments that exist. + NumSegments() int + + // SegmentFor returns a segment on which to store sz bytes. This may be + // a new or an existing segment. + SegmentFor(sz Size) (*Segment, error) + + // Segment returns the specified segment. Returns nil if the segment + // does not exist. + Segment(id SegmentID) *Segment + + // Reset clears the list of segments and sets all segments to point to + // a nil message and data slice. + Reset() +} + +// singleSegmentList is a segment list that only stores a single segment. +type singleSegmentList Segment + +func (ssl *singleSegmentList) NumSegments() int { return 1 } +func (ssl *singleSegmentList) SegmentFor(_ Size) (*Segment, error) { return (*Segment)(ssl), nil } +func (ssl *singleSegmentList) Reset() { + ssl.data = nil + ssl.msg = nil +} +func (ssl *singleSegmentList) Segment(id SegmentID) *Segment { + if id == 0 { + return (*Segment)(ssl) + } + return nil +} + +// multiSegmentList is a segment list that stores segments in a byte slice. +// +// New segments are allocated if none of the existing segments has enough +// capacity for new data. +type multiSegmentList struct { + segs []Segment +} + +func (msl *multiSegmentList) NumSegments() int { + return len(msl.segs) +} + +func (msl *multiSegmentList) SegmentFor(sz Size) (*Segment, error) { + var seg *Segment + for i := range msl.segs { + if hasCapacity(msl.segs[i].data, sz) { + seg = &msl.segs[i] + break + } + } + if seg == nil { + i := len(msl.segs) + msl.segs = append(msl.segs, Segment{id: SegmentID(i)}) + seg = &msl.segs[i] + } + return seg, nil +} + +func (msl *multiSegmentList) Segment(id SegmentID) *Segment { + if int(id) < len(msl.segs) { + return &msl.segs[int(id)] + } + return nil +} + +func (msl *multiSegmentList) Reset() { + for i := range msl.segs { + msl.segs[i].data = nil + msl.segs[i].msg = nil + } + msl.segs = msl.segs[:0] +} + +// arena is an implementation of an Arena that offloads most of its work to an +// associated allocator and segment list. +type arena struct { + alloc allocator + segs segmentList +} + +func (a arena) Allocate(sz Size, msg *Message, seg *Segment) (*Segment, address, error) { + // Determine total allocated amount in the arena. + var total int64 + for i := 0; i < a.segs.NumSegments(); i++ { + seg := a.segs.Segment(SegmentID(i)) + if seg == nil { + return nil, 0, errors.New("segment out of bounds") + } + + total += int64(len(seg.data)) + if total < 0 { + return nil, 0, errors.New("overflow attempting to allocate") + } + } + + // Determine the slice that will receive new data. Reuse seg if it has + // enough space for the data, otherwise ask the segment list for a + // segment to store data in (which may or may not be the same segment). + var b []byte + needsClearing := false + if seg == nil || !hasCapacity(seg.data, sz) { + var err error + + // Determine the segment to allocate in. + seg, err = a.segs.SegmentFor(sz) + if err != nil { + return nil, 0, err + } + + b = seg.data + if !hasCapacity(b, sz) { + // Size or resize the data. + b, err = a.alloc.Grow(seg.data, total, sz) + if err != nil { + return nil, 0, err + } + } else { + needsClearing = true + } + } else { + b = seg.data + needsClearing = true + } + + // The segment's full data is in b[0:], while the buffer requested by + // the caller is in b[:]. When this was a new + // segment, the two will be the same. + // + // The starting address of the newly allocated space is the end of the + // prior data. + addr := address(len(seg.data)) + seg.data = b[:addr.addSizeUnchecked(sz)] + seg.msg = msg + + // Clear the data after addr to ensure it is zero. The allocators + // usually already return cleared data, but sometimes a buffer is + // explicitly passed with left over data, so this ensures the memory + // that is about to be used is in fact all zeroes. + if needsClearing { + // TODO: use clear() once go 1.21 is the minimum required + // version. + toClear := seg.data[addr:] + for i := range toClear { + toClear[i] = 0 + } + } + + return seg, addr, nil +} + +func (a arena) Release() { + if a.alloc == nil && a.segs == nil { + // Empty arena. Use sane defaults. + a.alloc = (*bufferPoolAllocator)(nil) + a.segs = &singleSegmentList{} + } + + for i := 0; i < a.segs.NumSegments(); i++ { + // Release segment data to the allocator. + seg := a.segs.Segment(SegmentID(i)) + if seg.data != nil { + a.alloc.Release(seg.data) + } + } + + // Reset list of segments. + a.segs.Reset() +} + +// NumSegments returns the number of segments in the arena. +func (a arena) NumSegments() int64 { + return int64(a.segs.NumSegments()) +} + +// Data returns the data in the given segment or an error. +func (a arena) Data(id SegmentID) ([]byte, error) { + seg := a.segs.Segment(id) + if seg == nil { + return nil, errors.New("segment out of bounds") + } + return seg.data, nil +} + +func (a arena) Segment(id SegmentID) *Segment { + return a.segs.Segment(id) +} diff --git a/arena.go b/arena.go index 4278f9d3..4ea6ce92 100644 --- a/arena.go +++ b/arena.go @@ -2,10 +2,8 @@ package capnp import ( "errors" - "sync" - "capnproto.org/go/capnp/v3/exp/bufferpool" - "capnproto.org/go/capnp/v3/internal/str" + "capnproto.org/go/capnp/v3/exc" ) // An Arena loads and allocates segments for a Message. @@ -17,24 +15,28 @@ type Arena interface { // Data loads the data for the segment with the given ID. IDs are in // the range [0, NumSegments()). // must be tightly packed in the range [0, NumSegments()). + // + // TODO: remove in favor of Segment(x).Data(). + // Deprecated. Data(id SegmentID) ([]byte, error) + // Segment returns the segment identified with the specified id. This + // may return nil if the segment with the specified ID does not exist. + Segment(id SegmentID) *Segment + // Allocate selects a segment to place a new object in, creating a // segment or growing the capacity of a previously loaded segment if // necessary. If Allocate does not return an error, then the // difference of the capacity and the length of the returned slice - // must be at least minsz. segs is a map of segments keyed by ID - // using arrays returned by the Data method (although the length of - // these slices may have changed by previous allocations). Allocate - // must not modify segs. + // must be at least minsz. Some allocators may specifically choose + // to grow the passed seg (if non nil). // // If Allocate creates a new segment, the ID must be one larger than // the last segment's ID or zero if it is the first segment. // // If Allocate returns an previously loaded segment's ID, then the - // arena is responsible for preserving the existing data in the - // returned byte slice. - Allocate(minsz Size, segs map[SegmentID]*Segment) (SegmentID, []byte, error) + // arena is responsible for preserving the existing data. + Allocate(sz Size, msg *Message, seg *Segment) (*Segment, address, error) // Release all resources associated with the Arena. Callers MUST NOT // use the Arena after it has been released. @@ -51,194 +53,66 @@ type Arena interface { // in a continguous slice. Allocation is performed by first allocating a // new slice and copying existing data. SingleSegment arena does not fail // unless the caller attempts to access another segment. -type SingleSegmentArena []byte - -// SingleSegment constructs a SingleSegmentArena from b. b MAY be nil. -// Callers MAY use b to populate the segment for reading, or to reserve -// memory of a specific size. -func SingleSegment(b []byte) *SingleSegmentArena { - return (*SingleSegmentArena)(&b) -} - -func (ssa SingleSegmentArena) NumSegments() int64 { - return 1 -} - -func (ssa SingleSegmentArena) Data(id SegmentID) ([]byte, error) { - if id != 0 { - return nil, errors.New("segment " + str.Utod(id) + " requested in single segment arena") +func SingleSegment(b []byte) Arena { + var alloc allocator = (*bufferPoolAllocator)(nil) + if b != nil { + // When b is specified, do not return the buffer to any + // caches, because we don't know where the caller got the + // buffer from. + alloc = simpleAllocator{} } - return ssa, nil -} - -func (ssa *SingleSegmentArena) Allocate(sz Size, segs map[SegmentID]*Segment) (SegmentID, []byte, error) { - data := []byte(*ssa) - if segs[0] != nil { - data = segs[0].data - } - if len(data)%int(wordSize) != 0 { - return 0, nil, errors.New("segment size is not a multiple of word size") - } - if hasCapacity(data, sz) { - return 0, data, nil + return arena{ + alloc: alloc, + segs: &singleSegmentList{data: b}, } - inc, err := nextAlloc(int64(len(data)), int64(maxAllocSize()), sz) - if err != nil { - return 0, nil, err - } - buf := bufferpool.Default.Get(cap(data) + inc) - copied := copy(buf, data) - buf = buf[:copied] - bufferpool.Default.Put(data) - *ssa = buf - return 0, *ssa, nil -} - -func (ssa SingleSegmentArena) String() string { - return "single-segment arena [len=" + str.Itod(len(ssa)) + " cap=" + str.Itod(cap(ssa)) + "]" -} - -// Return this arena to an internal sync.Pool of arenas that can be -// re-used. Any time SingleSegment(nil) is called, arenas from this -// pool will be used if available, which can help reduce memory -// allocations. -// -// All segments will be zeroed before re-use. -// -// Calling Release is optional; if not done the garbage collector -// will release the memory per usual. -func (ssa *SingleSegmentArena) Release() { - bufferpool.Default.Put(*ssa) - *ssa = nil -} - -// MultiSegment is an arena that stores object data across multiple []byte -// buffers, allocating new buffers of exponentially-increasing size when -// full. This avoids the potentially-expensive slice copying of SingleSegment. -type MultiSegmentArena struct { - ss [][]byte - delim int // index of first segment in ss that is NOT in buf - buf []byte // full-sized buffer that was demuxed into ss. } // MultiSegment returns a new arena that allocates new segments when // they are full. b MAY be nil. Callers MAY use b to populate the // buffer for reading or to reserve memory of a specific size. -func MultiSegment(b [][]byte) *MultiSegmentArena { - if b == nil { - return multiSegmentPool.Get().(*MultiSegmentArena) - } - return multiSegment(b) -} - -// Return this arena to an internal sync.Pool of arenas that can be -// re-used. Any time MultiSegment(nil) is called, arenas from this -// pool will be used if available, which can help reduce memory -// allocations. -// -// All segments will be zeroed before re-use. -// -// Calling Release is optional; if not done the garbage collector -// will release the memory per usual. -func (msa *MultiSegmentArena) Release() { - for i, v := range msa.ss { - msa.ss[i] = nil - - // segment not in buf? - if i >= msa.delim { - bufferpool.Default.Put(v) +func MultiSegment(b [][]byte) Arena { + var alloc allocator = (*bufferPoolAllocator)(nil) + var segs []Segment + if b != nil { + // When b is specified, do not return the buffer to any + // caches, because we don't know where the caller got the + // buffer from. + alloc = simpleAllocator{} + segs = make([]Segment, len(b)) + for i := range b { + segs[i] = Segment{id: SegmentID(i), data: b[i]} } } - - bufferpool.Default.Put(msa.buf) // nil is ok - *msa = MultiSegmentArena{ss: msa.ss[:0]} - multiSegmentPool.Put(msa) -} - -// Like MultiSegment, but doesn't use the pool -func multiSegment(b [][]byte) *MultiSegmentArena { - return &MultiSegmentArena{ss: b} -} - -var multiSegmentPool = sync.Pool{ - New: func() any { - return multiSegment(make([][]byte, 0, 16)) - }, + return arena{ + alloc: alloc, + segs: &multiSegmentList{segs: segs}, + } } -// demuxArena slices data into a multi-segment arena. It assumes that -// len(data) >= hdr.totalSize(). -func (msa *MultiSegmentArena) demux(hdr streamHeader, data []byte) error { +// demuxArena demuxes a byte slice (that contains data for a list of +// segments identified on the header) into an appropriate arena. +func demuxArena(hdr streamHeader, data []byte) (Arena, error) { maxSeg := hdr.maxSegment() if int64(maxSeg) > int64(maxInt-1) { - return errors.New("number of segments overflows int") + return arena{}, errors.New("number of segments overflows int") } - msa.buf = data - msa.delim = int(maxSeg + 1) + if maxSeg == 0 && len(data) == 0 { + return SingleSegment(nil), nil + } - // We might be forced to allocate here, but hopefully it won't - // happen to often. We assume msa was freshly obtained from a - // pool, and that no segments have been allocated yet. - var segment []byte - for i := 0; i < msa.delim; i++ { + segBufs := make([][]byte, maxSeg+1) + off := 0 + for i := range segBufs { sz, err := hdr.segmentSize(SegmentID(i)) if err != nil { - return err - } - - segment, data = data[:sz:sz], data[sz:] - msa.ss = append(msa.ss, segment) - } - - return nil -} - -func (msa *MultiSegmentArena) NumSegments() int64 { - return int64(len(msa.ss)) -} - -func (msa *MultiSegmentArena) Data(id SegmentID) ([]byte, error) { - if int64(id) >= int64(len(msa.ss)) { - return nil, errors.New("segment " + str.Utod(id) + " requested (arena only has " + - str.Itod(len(msa.ss)) + " segments)") - } - return msa.ss[id], nil -} - -func (msa *MultiSegmentArena) Allocate(sz Size, segs map[SegmentID]*Segment) (SegmentID, []byte, error) { - var total int64 - for i, data := range msa.ss { - id := SegmentID(i) - if s := segs[id]; s != nil { - data = s.data + return arena{}, exc.WrapError("decode", err) } - - if hasCapacity(data, sz) { - return id, data, nil - } - - if total += int64(cap(data)); total < 0 { - // Overflow. - return 0, nil, errors.New("alloc " + str.Utod(sz) + " bytes: message too large") - } - } - - n, err := nextAlloc(total, 1<<63-1, sz) - if err != nil { - return 0, nil, err + segBufs[i] = data[off : off+int(sz)] + off += int(sz) } - buf := bufferpool.Default.Get(n) - buf = buf[:0] - - id := SegmentID(len(msa.ss)) - msa.ss = append(msa.ss, buf) - return id, buf, nil -} - -func (msa *MultiSegmentArena) String() string { - return "multi-segment arena [" + str.Itod(len(msa.ss)) + " segments]" + return MultiSegment(segBufs), nil } // nextAlloc computes how much more space to allocate given the number @@ -287,3 +161,40 @@ func nextAlloc(curr, max int64, req Size) (int, error) { func hasCapacity(b []byte, sz Size) bool { return sz <= Size(cap(b)-len(b)) } + +// ReadOnlySingleSegmentArena is a single segment arena backed by a byte slice +// that does not allow allocations. +type ReadOnlySingleSegmentArena Segment + +func (a *ReadOnlySingleSegmentArena) NumSegments() int64 { + return 1 +} + +func (a *ReadOnlySingleSegmentArena) Data(id SegmentID) ([]byte, error) { + if id != 0 { + return nil, errors.New("segment out of bounds") + } + return a.data, nil +} + +// Segment returns the segment identified with the specified id. This +// may return nil if the segment with the specified ID does not exist. +func (a *ReadOnlySingleSegmentArena) Segment(id SegmentID) *Segment { + if id > 0 { + return nil + } + return (*Segment)(a) +} + +func (a *ReadOnlySingleSegmentArena) Allocate(sz Size, msg *Message, seg *Segment) (*Segment, address, error) { + return nil, 0, errors.New("ReadOnlySingleSegmentArena cannot allocate") +} + +func (a *ReadOnlySingleSegmentArena) Release() { + // This does nothing. +} + +// UseBuffer switches the internal buffer to use the specified one. +func (a *ReadOnlySingleSegmentArena) UseBuffer(b []byte) { + a.data = b +} diff --git a/arena_test.go b/arena_test.go index 51267251..d9c5d4b2 100644 --- a/arena_test.go +++ b/arena_test.go @@ -50,6 +50,7 @@ func TestSingleSegment(t *testing.T) { }) } +/* func TestSingleSegmentAllocate(t *testing.T) { t.Parallel() @@ -118,6 +119,7 @@ func TestSingleSegmentAllocate(t *testing.T) { tests[i].run(t, i) } } +*/ func TestMultiSegment(t *testing.T) { t.Parallel() @@ -164,6 +166,8 @@ func TestMultiSegment(t *testing.T) { }) } +/* + func TestMultiSegmentAllocate(t *testing.T) { t.Parallel() @@ -233,3 +237,4 @@ func TestMultiSegmentAllocate(t *testing.T) { tests[i].run(t, i) } } +*/ diff --git a/canonical.go b/canonical.go index 85db07d8..7d85320e 100644 --- a/canonical.go +++ b/canonical.go @@ -18,7 +18,7 @@ func Canonicalize(s Struct) ([]byte, error) { if err != nil { return nil, exc.WrapError("canonicalize", err) } - if err := msg.SetRoot(root.ToPtr()); err != nil { + if err := msg.SetRoot(root.ToPtr()); err != nil { // TODO: dupe from NewRootStruct? return nil, exc.WrapError("canonicalize", err) } if err := fillCanonicalStruct(root, s); err != nil { diff --git a/codec.go b/codec.go index 83b42a0e..ca426333 100644 --- a/codec.go +++ b/codec.go @@ -66,16 +66,17 @@ func (d *Decoder) Decode() (*Message, error) { } // Read segments. + // + // TODO: improve so we don't impose bufferpool on caller. + // + // TODO: this should be streaming, not reading the entire thing in + // memory. buf := bufferpool.Default.Get(int(total)) if _, err := io.ReadFull(d.r, buf); err != nil { return nil, exc.WrapError("decode: read segments", err) } - arena := MultiSegment(nil) - if err = arena.demux(hdr, buf); err != nil { - return nil, exc.WrapError("decode", err) - } - + arena, err := demuxArena(hdr, buf) return &Message{Arena: arena}, nil } @@ -161,8 +162,8 @@ func Unmarshal(data []byte) (*Message, error) { return nil, errors.New("unmarshal: short data section") } - arena := MultiSegment(nil) - if err := arena.demux(hdr, data); err != nil { + arena, err := demuxArena(hdr, data) + if err != nil { return nil, exc.WrapError("unmarshal", err) } diff --git a/integration_test.go b/integration_test.go index fafb55d5..3c6980f7 100644 --- a/integration_test.go +++ b/integration_test.go @@ -1773,17 +1773,17 @@ func BenchmarkUnmarshal_Reuse(b *testing.B) { data[i], _ = msg.Marshal() } msg := new(capnp.Message) - ta := new(testArena) - arena := capnp.Arena(ta) + ta := new(capnp.ReadOnlySingleSegmentArena) + msg.ResetForRead(ta) b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { - *ta = testArena(data[r.Intn(len(data))][8:]) - _, err := msg.Reset(arena) + ta.UseBuffer(data[r.Intn(len(data))][8:]) + msg.ResetForRead(ta) + a, err := air.ReadRootBenchmarkA(msg) if err != nil { b.Fatal(err) } - a, _ := air.ReadRootBenchmarkA(msg) unmarshalA(a) } } @@ -1831,25 +1831,6 @@ func BenchmarkDecode(b *testing.B) { } } -type testArena []byte - -func (ta testArena) NumSegments() int64 { - return 1 -} - -func (ta testArena) Data(id capnp.SegmentID) ([]byte, error) { - if id != 0 { - return nil, errors.New("test arena: requested non-zero segment") - } - return []byte(ta), nil -} - -func (ta testArena) Allocate(capnp.Size, map[capnp.SegmentID]*capnp.Segment) (capnp.SegmentID, []byte, error) { - return 0, nil, errors.New("test arena: can't allocate") -} - -func (ta testArena) Release() {} - func TestPointerTraverseDefense(t *testing.T) { t.Parallel() const limit = 128 diff --git a/message.go b/message.go index fe214b8b..6347b069 100644 --- a/message.go +++ b/message.go @@ -28,6 +28,9 @@ const maxDepth = ^uint(0) // A Message is a tree of Cap'n Proto objects, split into one or more // segments of contiguous memory. The only required field is Arena. // A Message is safe to read from multiple goroutines. +// +// A message must be set up with a fully valid Arena when reading or with +// a valid and empty arena by calling NewArena. type Message struct { // rlimit must be first so that it is 64-bit aligned. // See sync/atomic docs. @@ -52,15 +55,13 @@ type Message struct { // DepthLimit limits how deeply-nested a message structure can be. // If not set, this defaults to 64. DepthLimit uint - - // mu protects the following fields: - mu sync.Mutex - segs map[SegmentID]*Segment - firstSeg Segment // Preallocated first segment. msg is non-nil once initialized. } // NewMessage creates a message with a new root and returns the first // segment. It is an error to call NewMessage on an arena with data in it. +// +// The new message is guaranteed to contain at least one segment and that +// segment is guaranteed to contain enough space for the root struct pointer. func NewMessage(arena Arena) (*Message, *Segment, error) { var msg Message first, err := msg.Reset(arena) @@ -88,25 +89,22 @@ func NewMultiSegmentMessage(b [][]byte) (msg *Message, first *Segment) { return msg, first } -// Release is syntactic sugar for Message.Reset(nil). See +// Release is syntactic sugar for Message.Reset(m.Arena). See // docstring for Reset for an important warning. func (m *Message) Release() { - m.Reset(nil) + m.Reset(m.Arena) } // Reset the message to use a different arena, allowing it // to be reused. This invalidates any existing pointers in // the Message, releases all clients in the cap table, and // releases the current Arena, so use with caution. +// +// Reset fails if the new arena is not empty and is not able to allocate enough +// space for at least one segment and its root pointer. In other words, Reset +// must only be used for messages which will be modified, not read. func (m *Message) Reset(arena Arena) (first *Segment, err error) { m.capTable.Reset() - for k := range m.segs { - if k == 0 && m.segs[k] == &m.firstSeg { - continue - } - delete(m.segs, k) - } - if m.Arena != nil { m.Arena.Release() } @@ -116,45 +114,66 @@ func (m *Message) Reset(arena Arena) (first *Segment, err error) { TraverseLimit: m.TraverseLimit, DepthLimit: m.DepthLimit, capTable: m.capTable, - segs: m.segs, - firstSeg: Segment{msg: m}, - } - - if arena != nil { - switch arena.NumSegments() { - case 0: - if first, err = m.allocSegment(wordSize); err != nil { - return nil, exc.WrapError("new message", err) - } - - case 1: - if first, err = m.Segment(0); err != nil { - return nil, exc.WrapError("new message", err) - } - if len(first.data) > 0 { - return nil, errors.New("new message: arena not empty") - } - - default: - return nil, errors.New("new message: arena not empty") - } + } - if first.ID() != 0 { - return nil, errors.New("new message: arena allocated first segment with non-zero ID") - } + // FIXME(matheusd): All the checks after this point have been added to + // maintain compatibility to the prior implementation and not break any + // tests, but personally, I think these should not exist. Message + // should be resettable with a pre-filled arena for reading without it + // failing and they should only enforce an allocation when the first + // write operation occurs. - seg, _, err := alloc(first, wordSize) // allocate root - if err != nil { - return nil, exc.WrapError("new message", err) + // Ensure there are no more than one segment allocated in the arena. + // This maintains compatibility to older versions of Reset(). + if arena.NumSegments() > 1 { + return nil, errors.New("arena already has multiple segments allocated") + } + + // Ensure the first segment has no data. + first = m.Arena.Segment(0) + if first != nil { + if len(first.data) != 0 { + return nil, errors.New("arena not empty") } - if seg != first { - return nil, errors.New("new message: arena allocated first word outside first segment") + if first.msg != nil && first.msg != m { + return nil, errors.New("first segment associated with different msg") } - } + // Ensure the first segment points to message m. + first.msg = m + } + + // Ensure the arena has size of at least the root pointer. + if first == nil || len(first.data) < int(wordSize) { + // When there is a first segment and it has capacity (but not + // yet length), manually resize it. + // + // TODO: this is here due to a single test requiring this + // behavior. Consider removing it. + if first != nil && len(first.data) == 0 && cap(first.data) >= int(wordSize) { + first.data = first.data[:8] + } else { + first, _, err = m.Arena.Allocate(wordSize, m, nil) + } + } return } +// ResetForRead resets the message for reading with the specified arena. This +// releases the current message arena, if it exists. +func (m *Message) ResetForRead(arena Arena) { + m.capTable.Reset() + if m.Arena != nil { + m.Arena.Release() + } + *m = Message{ + Arena: arena, + TraverseLimit: m.TraverseLimit, + DepthLimit: m.DepthLimit, + capTable: m.capTable, + } +} + func (m *Message) initReadLimit() { if m.TraverseLimit == 0 { m.rlimit.Store(defaultTraverseLimit) @@ -198,7 +217,11 @@ func (m *Message) Root() (Ptr, error) { if err != nil { return Ptr{}, exc.WrapError("read root", err) } - p, err := s.root().At(0) + root, ok := s.root() + if !ok { + return Ptr{}, errors.New("root is not set") + } + p, err := root.At(0) if err != nil { return Ptr{}, exc.WrapError("read root", err) } @@ -211,7 +234,19 @@ func (m *Message) SetRoot(p Ptr) error { if err != nil { return exc.WrapError("set root", err) } - if err := s.root().Set(0, p); err != nil { + root, ok := s.root() + if !ok { + // Root is not allocated on the first segment. Allocate it now. + _, _, err := m.alloc(wordSize, nil) + if err != nil { + return exc.WrapError("initial alloc", err) + } + root, ok = s.root() + if !ok { + return errors.New("unable to allocate root") + } + } + if err := root.Set(0, p); err != nil { return exc.WrapError("set root", err) } return nil @@ -252,97 +287,44 @@ func (m *Message) depthLimit() uint { // NumSegments returns the number of segments in the message. func (m *Message) NumSegments() int64 { - return int64(m.Arena.NumSegments()) + return m.Arena.NumSegments() } -// Segment returns the segment with the given ID. +// Segment returns the segment with the given ID. If err == nil, then the +// segment is enforced to exist and to be associated to message m. func (m *Message) Segment(id SegmentID) (*Segment, error) { - if int64(id) >= m.Arena.NumSegments() { + seg := m.Arena.Segment(id) + if seg == nil { return nil, errors.New("segment " + str.Utod(id) + ": out of bounds") } - m.mu.Lock() - seg, err := m.segment(id) - m.mu.Unlock() - return seg, err -} -// segment returns the segment with the given ID, with no bounds -// checking. The caller must be holding m.mu. -func (m *Message) segment(id SegmentID) (*Segment, error) { - if m.segs == nil && id == 0 && m.firstSeg.msg != nil && m.firstSeg.data != nil { - return &m.firstSeg, nil - } - if s := m.segs[id]; s != nil && s.data != nil { - return s, nil + if seg.msg == nil { + seg.msg = m } - if len(m.segs) == maxInt { - return nil, errors.New("segment " + str.Utod(id) + ": number of loaded segments exceeds int") - } - data, err := m.Arena.Data(id) - if err != nil { - return nil, exc.WrapError("load segment "+str.Utod(id), err) + if seg.msg != m { + return nil, fmt.Errorf("segment %d associated with different msg", id) } - s := m.setSegment(id, data) - return s, nil -} - -// setSegment creates or updates the Segment with the given ID. -// The caller must be holding m.mu. -func (m *Message) setSegment(id SegmentID, data []byte) *Segment { - if m.segs == nil { - if id == 0 { - m.firstSeg = Segment{ - id: id, - msg: m, - data: data, - } - return &m.firstSeg - } - m.segs = make(map[SegmentID]*Segment) - if m.firstSeg.msg != nil { - m.segs[0] = &m.firstSeg - } - } else if seg := m.segs[id]; seg != nil { - seg.data = data - return seg - } - seg := &Segment{ - id: id, - msg: m, - data: data, - } - m.segs[id] = seg - return seg + return seg, nil } -// allocSegment creates or resizes an existing segment such that -// cap(seg.Data) - len(seg.Data) >= sz. The caller must not be holding -// onto m.mu. -func (m *Message) allocSegment(sz Size) (*Segment, error) { +func (m *Message) alloc(sz Size, pref *Segment) (*Segment, address, error) { if sz > maxAllocSize() { - return nil, errors.New("allocation: too large") + return nil, 0, errors.New("allocation: too large") } + sz = sz.padToWord() - m.mu.Lock() - defer m.mu.Unlock() - - if len(m.segs) == maxInt { - return nil, errors.New("allocation: number of loaded segments exceeds int") + seg, addr, err := m.Arena.Allocate(sz, m, pref) + if err != nil { + return nil, 0, err } - - // Transition from sole segment to segment map? - if m.segs == nil && m.firstSeg.msg != nil { - m.segs = make(map[SegmentID]*Segment) - m.segs[0] = &m.firstSeg + if seg == nil { + return nil, 0, errors.New("arena returned nil segment for Allocate()") } - - id, data, err := m.Arena.Allocate(sz, m.segs) - if err != nil { - return nil, exc.WrapError("allocation", err) + if seg.msg != nil && seg.msg != m { + return nil, 0, errors.New("arena returned segment assigned to other message") } - - seg := m.setSegment(id, data) - return seg, nil + seg.msg = m + return seg, addr, nil } func (m *Message) WriteTo(w io.Writer) (int64, error) { @@ -364,31 +346,28 @@ func (m *Message) Marshal() ([]byte, error) { return nil, errors.New("marshal: header size overflows int") } var dataSize uint64 - m.mu.Lock() for i := int64(0); i < nsegs; i++ { - s, err := m.segment(SegmentID(i)) + s, err := m.Segment(SegmentID(i)) if err != nil { - m.mu.Unlock() - return nil, exc.WrapError("marshal", err) + return nil, exc.WrapError("marshal: ", err) + } + if s == nil { + return nil, errors.New("marshal: nil segment") } n := uint64(len(s.data)) if n%uint64(wordSize) != 0 { - m.mu.Unlock() return nil, errors.New("marshal: segment " + str.Itod(i) + " not word-aligned") } if n > uint64(maxSegmentSize) { - m.mu.Unlock() return nil, errors.New("marshal: segment " + str.Itod(i) + " too large") } dataSize += n if dataSize > uint64(maxInt) { - m.mu.Unlock() return nil, errors.New("marshal: message size overflows int") } } total := hdrSize + dataSize if total > uint64(maxInt) { - m.mu.Unlock() return nil, errors.New("marshal: message size overflows int") } @@ -396,19 +375,16 @@ func (m *Message) Marshal() ([]byte, error) { buf := make([]byte, int(hdrSize), int(total)) binary.LittleEndian.PutUint32(buf, uint32(nsegs-1)) for i := int64(0); i < nsegs; i++ { - s, err := m.segment(SegmentID(i)) + s, err := m.Segment(SegmentID(i)) if err != nil { - m.mu.Unlock() - return nil, exc.WrapError("marshal", err) + return nil, exc.WrapError("marshal: ", err) } if len(s.data)%int(wordSize) != 0 { - m.mu.Unlock() return nil, errors.New("marshal: segment " + str.Itod(i) + " not word-aligned") } binary.LittleEndian.PutUint32(buf[int(i+1)*4:], uint32(len(s.data)/int(wordSize))) buf = append(buf, s.data...) } - m.mu.Unlock() return buf, nil } @@ -438,28 +414,5 @@ func (wc *writeCounter) Write(b []byte) (n int, err error) { // use a different segment in the same message if there's not sufficient // capacity. func alloc(s *Segment, sz Size) (*Segment, address, error) { - if sz > maxAllocSize() { - return nil, 0, errors.New("allocation: too large") - } - sz = sz.padToWord() - - if !hasCapacity(s.data, sz) { - var err error - s, err = s.msg.allocSegment(sz) - if err != nil { - return nil, 0, fmt.Errorf("allocSegment failed: %v", err) - } - } - - addr := address(len(s.data)) - end, ok := addr.addSize(sz) - if !ok { - return nil, 0, errors.New("allocation: address overflow") - } - space := s.data[len(s.data):end] - s.data = s.data[:end] - for i := range space { - space[i] = 0 - } - return s, addr, nil + return s.msg.alloc(sz, s) } diff --git a/message_test.go b/message_test.go index 5bd8448c..5bf1ae94 100644 --- a/message_test.go +++ b/message_test.go @@ -604,8 +604,9 @@ type arenaAllocTest struct { } func (test *arenaAllocTest) run(t *testing.T, i int) { - arena, segs := test.init() - id, data, err := arena.Allocate(test.size, segs) + arena, _ := test.init() + seg, _, err := arena.Allocate(test.size, nil, nil) + id, data := seg.id, seg.data if err != nil { t.Errorf("tests[%d] - %s: Allocate error: %v", i, test.name, err) @@ -638,8 +639,8 @@ func (ro readOnlyArena) String() string { return fmt.Sprintf("readOnlyArena{%v}", ro.Arena) } -func (readOnlyArena) Allocate(sz Size, segs map[SegmentID]*Segment) (SegmentID, []byte, error) { - return 0, nil, errReadOnlyArena +func (readOnlyArena) Allocate(sz Size, msg *Message, seg *Segment) (*Segment, address, error) { + return nil, 0, errReadOnlyArena } var errReadOnlyArena = errors.New("Allocate called on read-only arena") diff --git a/segment.go b/segment.go index 119e50cf..1d704c72 100644 --- a/segment.go +++ b/segment.go @@ -98,17 +98,20 @@ func (s *Segment) writeRawPointer(addr address, val rawPointer) { // root returns a 1-element pointer list that references the first word // in the segment. This only makes sense to call on the first segment // in a message. -func (s *Segment) root() PointerList { +// +// Returns true if the root element is allocated in the segment, false +// otherwise. +func (s *Segment) root() (PointerList, bool) { sz := ObjectSize{PointerCount: 1} if !s.regionInBounds(0, sz.totalSize()) { - return PointerList{} + return PointerList{}, false } return PointerList{ seg: s, length: 1, size: sz, depthLimit: s.msg.depthLimit(), - } + }, true } func (s *Segment) lookupSegment(id SegmentID) (*Segment, error) { @@ -395,6 +398,8 @@ func (s *Segment) writePtr(off address, src Ptr, forceCopy bool) error { return nil case hasCapacity(src.seg.data, wordSize): // Enough room adjacent to src to write a far pointer landing pad. + // TODO: instead of alloc (which may choose another segment), + // enforce to _always_ use seg (because we know it has capacity). _, padAddr, _ := alloc(src.seg, wordSize) src.seg.writeRawPointer(padAddr, srcRaw.withOffset(nearPointerOffset(padAddr, srcAddr))) s.writeRawPointer(off, rawFarPointer(src.seg.id, padAddr)) From 4d538679ac2c13d3af250c2791ed3d61ffe55d49 Mon Sep 17 00:00:00 2001 From: Matheus Degiovani Date: Fri, 26 Apr 2024 11:57:30 -0300 Subject: [PATCH 07/19] Refactor nextAlloc The prior version isn't commented and it's hard to reason about. --- allocator.go | 4 ++-- arena.go | 51 ++++++++++++++++++++++++++------------------------- 2 files changed, 28 insertions(+), 27 deletions(-) diff --git a/allocator.go b/allocator.go index 47b01603..482ac59e 100644 --- a/allocator.go +++ b/allocator.go @@ -34,7 +34,7 @@ func (bpa *bufferPoolAllocator) Grow(b []byte, totalMsgSize int64, sz Size) ([]b pool = &bufferpool.Default } - inc, err := nextAlloc(totalMsgSize, int64(maxAllocSize()), sz) + inc, err := nextAlloc(totalMsgSize, sz) if err != nil { return nil, err } @@ -72,7 +72,7 @@ func (bpa *bufferPoolAllocator) Release(b []byte) { type simpleAllocator struct{} func (_ simpleAllocator) Grow(b []byte, totalMsgSize int64, sz Size) ([]byte, error) { - inc, err := nextAlloc(totalMsgSize, int64(maxAllocSize()), sz) + inc, err := nextAlloc(totalMsgSize, sz) if err != nil { return nil, err } diff --git a/arena.go b/arena.go index 4ea6ce92..3e1889dc 100644 --- a/arena.go +++ b/arena.go @@ -120,41 +120,42 @@ func demuxArena(hdr streamHeader, data []byte) (Arena, error) { // bytes. It will always return a multiple of wordSize. max must be a // multiple of wordSize. The sum of curr and the returned size will // always be less than max. -func nextAlloc(curr, max int64, req Size) (int, error) { +func nextAlloc(curr int64, req Size) (int, error) { + if req == 0 { return 0, nil } + + req = req.padToWord() + totalWant := curr + int64(req) + + // Sanity checks. if req > maxAllocSize() { return 0, errors.New("alloc " + req.String() + ": too large") } - padreq := req.padToWord() - want := curr + int64(padreq) - if want <= curr || want > max { + if totalWant <= curr || totalWant > int64(maxAllocSize()) { return 0, errors.New("alloc " + req.String() + ": message size overflow") } - new := curr - double := new + new + + if totalWant < 1024 { + return int(req), nil + } + + // doubleCurr is double the current total message size (padded to the + // word sized). + doubleCurr := (curr*2 + 7) &^ 7 + + // The following attempts to amortize allocation costs across a wide + // range of uses. switch { - case want < 1024: - next := (1024 - curr + 7) &^ 7 - if next < curr { - return int((curr + 7) &^ 7), nil - } - return int(next), nil - case want > double: - return int(padreq), nil + case totalWant < 1024*1024 && doubleCurr < 1024*1024: + // When doubling the message size still keeps the message under + // 1MiB, double the message size. + return int(doubleCurr), nil + default: - for 0 < new && new < want { - new += new / 4 - } - if new <= 0 { - return int(padreq), nil - } - delta := new - curr - if delta > int64(maxAllocSize()) { - return int(maxAllocSize()), nil - } - return int((delta + 7) &^ 7), nil + // Otherwise, allocate the requested amount. + return int(req), nil } } From 336313be1b5ed7398b6039c9efd8b5aead78bcdc Mon Sep 17 00:00:00 2001 From: Matheus Degiovani Date: Fri, 26 Apr 2024 11:58:10 -0300 Subject: [PATCH 08/19] Improve captable.release() This makes the captable release more efficient, avoid unnecessary allocations. --- captable.go | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/captable.go b/captable.go index 95a9da6d..bb4b0927 100644 --- a/captable.go +++ b/captable.go @@ -14,11 +14,21 @@ type CapTable struct { // the length to zero. Clients passed as arguments are added // to the table after zeroing, such that ct.Len() == len(cs). func (ct *CapTable) Reset(cs ...Client) { - for _, c := range ct.cs { - c.Release() - } + switch { + case len(ct.cs) == 0 && len(cs) == 0: + // Nothing to do + return + + case len(ct.cs) == 0 && len(cs) != 0: + ct.cs = cs - ct.cs = append(ct.cs[:0], cs...) + default: + for _, c := range ct.cs { + c.Release() + } + + ct.cs = append(ct.cs[:0], cs...) + } } // Len returns the number of capabilities in the table. From 2af3b9706cb8b0c940fd476db57a2b6d13635ee7 Mon Sep 17 00:00:00 2001 From: Matheus Degiovani Date: Fri, 26 Apr 2024 12:05:20 -0300 Subject: [PATCH 09/19] Reset message in a cleaner way This avoids some duffcopy calls and improves perf. --- message.go | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/message.go b/message.go index 6347b069..d2a45d8d 100644 --- a/message.go +++ b/message.go @@ -166,12 +166,9 @@ func (m *Message) ResetForRead(arena Arena) { if m.Arena != nil { m.Arena.Release() } - *m = Message{ - Arena: arena, - TraverseLimit: m.TraverseLimit, - DepthLimit: m.DepthLimit, - capTable: m.capTable, - } + m.Arena = arena + m.rlimit = atomic.Uint64{} + m.rlimitInit = sync.Once{} } func (m *Message) initReadLimit() { From 9f83283860f4b2c7f9617a73f59e5addaae320b5 Mon Sep 17 00:00:00 2001 From: Matheus Degiovani Date: Fri, 26 Apr 2024 12:10:28 -0300 Subject: [PATCH 10/19] fixup! Refactor nextAlloc --- message_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/message_test.go b/message_test.go index 5bf1ae94..f2dc27a4 100644 --- a/message_test.go +++ b/message_test.go @@ -530,7 +530,7 @@ func TestNextAlloc(t *testing.T) { t.Errorf("%s: max must be word-aligned. Skipped.", test.name) continue } - got, err := nextAlloc(test.curr, test.max, test.req) + got, err := nextAlloc(test.curr, test.req) if err != nil { if test.ok { t.Errorf("%s: nextAlloc(%d, %d, %d) = _, %v; want >=%d, ", test.name, test.curr, test.max, test.req, err, test.req) From 1651df3b9f5ec3b3aa69d6dbe27a573c884688a4 Mon Sep 17 00:00:00 2001 From: Matheus Degiovani Date: Fri, 26 Apr 2024 12:16:15 -0300 Subject: [PATCH 11/19] Add AllocateAsRoot --- message.go | 35 +++++++++++++++++++++++++++++++++++ message_test.go | 48 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 83 insertions(+) diff --git a/message.go b/message.go index d2a45d8d..01a07831 100644 --- a/message.go +++ b/message.go @@ -225,6 +225,41 @@ func (m *Message) Root() (Ptr, error) { return p, nil } +// AllocateAsRoot allocates the passed size and sets that as the root structure +// of the message. +func (m *Message) AllocateAsRoot(size ObjectSize) (*Segment, address, error) { + // Allocate enough for the root pointer + the root structure. Doing a + // single alloc ensures both end up on the same segment (which should + // be segment zero and the offset should also be zero, meaning the root + // pointer is the first data written to the segment). + // + // Technically, it could be the case (depending on the arena + // implementation) that the first segment would have only the root + // pointer (one word) and then the root struct would be put in a + // different segment (using a landing pad), but that would be very + // inneficient in several ways, so we opt here to enforce a single + // alloc for both as an optimization. + s, rootAddr, err := m.alloc(wordSize+size.totalSize(), nil) + if err != nil { + return nil, 0, err + } + if s.ID() != 0 { + return nil, 0, errors.New("root was not allocated on first segment") + } + if rootAddr != 0 { + return nil, 0, errors.New("root struct was already allocated") + } + + // The root struct starts immediately after the root pointer (which + // takes one word). + srcAddr := address(wordSize) + + // FIXME: does not handle lists and interfaces/capabilities yet. + srcRaw := rawStructPointer(0, size) + s.writeRawPointer(rootAddr, srcRaw.withOffset(nearPointerOffset(rootAddr, srcAddr))) + return s, srcAddr, nil +} + // SetRoot sets the message's root object to p. func (m *Message) SetRoot(p Ptr) error { s, err := m.Segment(0) diff --git a/message_test.go b/message_test.go index f2dc27a4..0163cd69 100644 --- a/message_test.go +++ b/message_test.go @@ -663,6 +663,54 @@ func BenchmarkMessageGetFirstSegment(b *testing.B) { } } +// BenchmarkMessageSetRoot benchmarks setting the root structure of a message. +func BenchmarkMessageSetRoot(b *testing.B) { + var msg Message + var arena Arena = SingleSegment(nil) + + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + _, err := msg.Reset(arena) + if err != nil { + b.Fatal(err) + } + s, err := msg.Segment(0) + if err != nil { + b.Fatal(err) + } + + st, err := NewRootStruct(s, ObjectSize{DataSize: 8, PointerCount: 0}) + if err != nil { + b.Fatal(err) + } + err = msg.SetRoot(st.ToPtr()) + if err != nil { + b.Fatal(err) + } + } +} + +// BenchmarkMessageAllocateAsRoot benchmarks using AllocateAsRoot to allocate +// a new root structure. +func BenchmarkMessageAllocateAsRoot(b *testing.B) { + var msg Message + var arena Arena = SingleSegment(nil) + + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + // NOTE: Needs to be ResetForRead() because Reset() allocates + // the root pointer. This is part of API madness. + msg.ResetForRead(arena) + + _, _, err := msg.AllocateAsRoot(ObjectSize{DataSize: 8, PointerCount: 0}) + if err != nil { + b.Fatal(err) + } + } +} + // TestCannotResetArenaForRead demonstrates that Reset() cannot be used when // intending to read data from an arena (i.e. cannot reuse a msg value by // calling Reset with the intention to read data). From c93652fe0c3c04d47202fd3ac92b3805c4a4691f Mon Sep 17 00:00:00 2001 From: Matheus Degiovani Date: Wed, 1 May 2024 17:14:56 -0300 Subject: [PATCH 12/19] Add experimental AllocateAsRoot and SetFlatText --- arena.go | 55 ++++++++++++++++++++++++++ internal/aircraftlib/experimental.go | 19 +++++++++ struct.go | 44 +++++++++++++++++++++ struct_test.go | 59 ++++++++++++++++++++++++++++ 4 files changed, 177 insertions(+) create mode 100644 internal/aircraftlib/experimental.go create mode 100644 struct_test.go diff --git a/arena.go b/arena.go index 3e1889dc..589bbcb9 100644 --- a/arena.go +++ b/arena.go @@ -199,3 +199,58 @@ func (a *ReadOnlySingleSegmentArena) Release() { func (a *ReadOnlySingleSegmentArena) UseBuffer(b []byte) { a.data = b } + +// SimpleSingleSegmentArena is an alternative implementation of a single +// segment arena that is not subject to the same legacy behavior as the the +// single segment arena initialized by SingleSegmentArena. +// +// This arena is not safe for concurrent access and holds onto the last +// allocated buffer for reuse after a call to Relese(). +type SimpleSingleSegmentArena Segment + +func (a *SimpleSingleSegmentArena) NumSegments() int64 { return 1 } + +func (a *SimpleSingleSegmentArena) Data(id SegmentID) ([]byte, error) { + if id != 0 { + return nil, errors.New("segment out of bounds") + } + return a.data, nil +} + +func (a *SimpleSingleSegmentArena) Segment(id SegmentID) *Segment { + if id != 0 { + return nil + } + return (*Segment)(a) +} + +func (a *SimpleSingleSegmentArena) Allocate(sz Size, msg *Message, seg *Segment) (*Segment, address, error) { + totalMsgSize := int64(len(a.data)) + inc, err := nextAlloc(totalMsgSize, sz) + if err != nil { + return nil, 0, err + } + + addr := address(len(a.data)) + capNeeded := len(a.data) + inc + if capNeeded <= cap(a.data) { + a.data = a.data[:capNeeded] + } else { + a.data = append(a.data, make([]byte, inc)...) + } + return (*Segment)(a), addr, nil +} + +func (a *SimpleSingleSegmentArena) Release() { + for i := range a.data { + a.data[i] = 0 + } + a.data = a.data[:0] +} + +// ReplaceBuffer replaces the internal buffer with a new buffer. This +// effectively resets a message to the one encoded by the passed buffer, which +// should be a single segment message. +func (a *SimpleSingleSegmentArena) ReplaceBuffer(b []byte) { + a.data = b +} diff --git a/internal/aircraftlib/experimental.go b/internal/aircraftlib/experimental.go new file mode 100644 index 00000000..1a44c356 --- /dev/null +++ b/internal/aircraftlib/experimental.go @@ -0,0 +1,19 @@ +package aircraftlib + +import capnp "capnproto.org/go/capnp/v3" + +// Experimental: This could replace NewRootBenchmarkA without having to modify +// the public API. +func AllocateNewRootBenchmark(msg *capnp.Message) (BenchmarkA, error) { + st, err := capnp.AllocateRootStruct(msg, capnp.ObjectSize{DataSize: 24, PointerCount: 2}) + return BenchmarkA(st), err + +} + +// Exprimental: set the name using the flat/unrolled version of SetNewText. +// +// If the unrolled version is deemed good enough, it would just be replaced +// inside SetNewText, without having to alter the public API. +func (s BenchmarkA) FlatSetName(v string) error { + return capnp.Struct(s).FlatSetNewText(0, v) +} diff --git a/struct.go b/struct.go index 201c006a..18deffee 100644 --- a/struct.go +++ b/struct.go @@ -21,6 +21,25 @@ type StructKind = struct { flags structFlags } +// AllocateRootStruct allocates the root struct on the message as a struct of +// the passed size. +func AllocateRootStruct(msg *Message, sz ObjectSize) (Struct, error) { + if !sz.isValid() { + return Struct{}, errors.New("new struct: invalid size") + } + sz.DataSize = sz.DataSize.padToWord() + s, addr, err := msg.AllocateAsRoot(sz) + if err != nil { + return Struct{}, exc.WrapError("new struct", err) + } + return Struct{ + seg: s, + off: addr, + size: sz, + depthLimit: maxDepth, + }, nil +} + // NewStruct creates a new struct, preferring placement in s. func NewStruct(s *Segment, sz ObjectSize) (Struct, error) { if !sz.isValid() { @@ -141,6 +160,31 @@ func (p Struct) SetText(i uint16, v string) error { return p.SetNewText(i, v) } +func (p Struct) FlatSetNewText(i uint16, v string) error { + // NewText().newPrimitiveList + sz := Size(1) + n := int32(len(v) + 1) + total := sz.timesUnchecked(n) + s, addr, err := alloc(p.seg, total) + if err != nil { + return err + } + + // NewText() + copy(s.slice(addr, Size(len(v))), v) + + // SetPtr().pointerAddress() + // offInsideP := p.pointerAddress(i) + ptrStart, _ := p.off.addSize(p.size.DataSize) + offInsideP, _ := ptrStart.element(int32(i), wordSize) + + // SetPtr().writePtr() + srcAddr := addr // srcAddr = l.off + srcRaw := rawListPointer(0, byte1List, int32(len(v))) // srcRaw = l.raw() + s.writeRawPointer(offInsideP, srcRaw.withOffset(nearPointerOffset(offInsideP, srcAddr))) + return nil +} + // SetNewText sets the i'th pointer to a newly allocated text. func (p Struct) SetNewText(i uint16, v string) error { t, err := NewText(p.seg, v) diff --git a/struct_test.go b/struct_test.go new file mode 100644 index 00000000..f2727a07 --- /dev/null +++ b/struct_test.go @@ -0,0 +1,59 @@ +package capnp_test + +import ( + "testing" + + "capnproto.org/go/capnp/v3" + "capnproto.org/go/capnp/v3/internal/aircraftlib" +) + +// BenchmarkSetText benchmarks setting a single text field in a message. +func BenchmarkSetText(b *testing.B) { + var msg capnp.Message + var arena = &capnp.SimpleSingleSegmentArena{} + arena.ReplaceBuffer(make([]byte, 0, 1024)) + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + // NOTE: Needs to be ResetForRead() because Reset() allocates + // the root pointer. This is part of API madness. + msg.ResetForRead(arena) + + a, err := aircraftlib.AllocateNewRootBenchmark(&msg) + if err != nil { + b.Fatal(err) + } + + err = a.SetName("my name") + if err != nil { + b.Fatal(err) + } + } +} + +func BenchmarkSetTextFlat(b *testing.B) { + var msg capnp.Message + var arena = &capnp.SimpleSingleSegmentArena{} + arena.ReplaceBuffer(make([]byte, 0, 1024)) + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + // NOTE: Needs to be ResetForRead() because Reset() allocates + // the root pointer. This is part of API madness. + msg.ResetForRead(arena) + + a, err := aircraftlib.AllocateNewRootBenchmark(&msg) + if err != nil { + b.Fatal(err) + } + + err = a.FlatSetName("my name") + if err != nil { + b.Fatal(err) + } + } +} From bde3071c91a90ec81fbaf1dd12226b8004c48e6a Mon Sep 17 00:00:00 2001 From: Matheus Degiovani Date: Fri, 3 May 2024 15:54:51 -0300 Subject: [PATCH 13/19] Add UpdateText() version --- internal/aircraftlib/experimental.go | 5 +++ struct.go | 53 ++++++++++++++++++++++++++++ struct_test.go | 34 ++++++++++++++++++ 3 files changed, 92 insertions(+) diff --git a/internal/aircraftlib/experimental.go b/internal/aircraftlib/experimental.go index 1a44c356..8dfc4476 100644 --- a/internal/aircraftlib/experimental.go +++ b/internal/aircraftlib/experimental.go @@ -17,3 +17,8 @@ func AllocateNewRootBenchmark(msg *capnp.Message) (BenchmarkA, error) { func (s BenchmarkA) FlatSetName(v string) error { return capnp.Struct(s).FlatSetNewText(0, v) } + +// Experimental: update the set in-place. +func (s BenchmarkA) UpdateName(v string) error { + return capnp.Struct(s).UpdateText(0, v) +} diff --git a/struct.go b/struct.go index 18deffee..adcd734d 100644 --- a/struct.go +++ b/struct.go @@ -185,6 +185,59 @@ func (p Struct) FlatSetNewText(i uint16, v string) error { return nil } +// EXPERIMENTAL unrolled version of updating a text field in-place when it +// already has enough space to hold the value (as opposed to allocating a new +// object in the message). +func (p Struct) UpdateText(i uint16, v string) error { + // Determine pointer offset. + ptrStart, _ := p.off.addSize(p.size.DataSize) + offInsideP, _ := ptrStart.element(int32(i), wordSize) + + // ptr, err := p.seg.readPtr(offInsideP, p.depthLimit) + s, base, val, err := p.seg.resolveFarPointer(offInsideP) + if err != nil { + return exc.WrapError("read pointer", err) + } + + // TODO: depth limit read check? + + if val == 0 { + // Existing pointer is empty/void. + return p.FlatSetNewText(i, v) + } + + // lp, err := s.readListPtr(base, val) + addr, ok := val.offset().resolve(base) + if !ok { + return errors.New("list pointer: invalid address") + } + // TODO: list checks from readListPtr()? + + if err != nil { + return exc.WrapError("read pointer", err) + } + + length := int(val.numListElements()) + + if length < len(v)+1 { + // Existing buffer does not have enough space for new text. + return p.FlatSetNewText(i, v) + } + + // Existing buffer location has space for new text. Copy text over it. + dst := s.slice(addr, Size(length)) + n := copy(dst, []byte(v)) + + // Pad with zeros (clear leftover). Last byte is already zero. + // + // TODO: replace with clear(dst[n:length-1]) after go1.21. + for i := n; i < int(length-1); i++ { + dst[i] = 0 + } + + return nil +} + // SetNewText sets the i'th pointer to a newly allocated text. func (p Struct) SetNewText(i uint16, v string) error { t, err := NewText(p.seg, v) diff --git a/struct_test.go b/struct_test.go index f2727a07..119e773d 100644 --- a/struct_test.go +++ b/struct_test.go @@ -57,3 +57,37 @@ func BenchmarkSetTextFlat(b *testing.B) { } } } + +// BenchmarkSetTextUpdate benchmarks updating the text field in-place. +func BenchmarkSetTextUpdate(b *testing.B) { + var msg capnp.Message + var arena = &capnp.SimpleSingleSegmentArena{} + arena.ReplaceBuffer(make([]byte, 0, 1024)) + + // NOTE: Needs to be ResetForRead() because Reset() allocates + // the root pointer. This is part of API madness. + msg.ResetForRead(arena) + + a, err := aircraftlib.AllocateNewRootBenchmark(&msg) + if err != nil { + b.Fatal(err) + } + + err = a.SetName("my name") + if err != nil { + b.Fatal(err) + } + + // WHY?!?!?!? + msg.ResetReadLimit(1 << 31) + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + err := a.UpdateName("my name") + if err != nil { + b.Fatal(err) + } + } +} From e7d8d6ffc62f046a0b7bc7084097f360df603a6e Mon Sep 17 00:00:00 2001 From: Matheus Degiovani Date: Mon, 6 May 2024 14:55:54 -0300 Subject: [PATCH 14/19] experimental: Add TextField The TextField is a reference to a specific text field inside a struct. It records both the pointer and value locations inside a struct, which may be used to fetch or update the underlying value. --- internal/aircraftlib/experimental.go | 5 +++ struct.go | 62 ++++++++++++++++++++++++++++ struct_test.go | 38 +++++++++++++++++ 3 files changed, 105 insertions(+) diff --git a/internal/aircraftlib/experimental.go b/internal/aircraftlib/experimental.go index 8dfc4476..50fa7214 100644 --- a/internal/aircraftlib/experimental.go +++ b/internal/aircraftlib/experimental.go @@ -22,3 +22,8 @@ func (s BenchmarkA) FlatSetName(v string) error { func (s BenchmarkA) UpdateName(v string) error { return capnp.Struct(s).UpdateText(0, v) } + +// Experimental: return the name as a field that can be mutated. +func (s BenchmarkA) NameField() (capnp.TextField, error) { + return capnp.Struct(s).TextField(0) +} diff --git a/struct.go b/struct.go index adcd734d..331ffdc2 100644 --- a/struct.go +++ b/struct.go @@ -238,6 +238,68 @@ func (p Struct) UpdateText(i uint16, v string) error { return nil } +type TextField struct { + // Pointer location + pSeg *Segment + pAddr address + + // Current value location + vSeg *Segment + vAddr address + vLen int +} + +// EXPERIMENTAL: return ith pointer as a text field. +func (p Struct) TextField(i uint16) (TextField, error) { + ptrStart, _ := p.off.addSize(p.size.DataSize) + offInsideP, _ := ptrStart.element(int32(i), wordSize) + + // ptr, err := p.seg.readPtr(offInsideP, p.depthLimit) + s, base, val, err := p.seg.resolveFarPointer(offInsideP) + if err != nil { + return TextField{}, exc.WrapError("read pointer", err) + } + + tf := TextField{pSeg: p.seg, pAddr: offInsideP} + + if val == 0 { + return tf, nil + } + + addr, ok := val.offset().resolve(base) + if !ok { + return TextField{}, errors.New("list pointer: invalid address") + } + + tf.vSeg = s + tf.vLen = int(val.numListElements()) + tf.vAddr = addr + + return tf, nil +} + +// UpdateText updates the value of the text field. +func (tf *TextField) Set(v string) error { + if tf.vLen < len(v)+1 || tf.vSeg == nil { + // TODO: handle this case. Needs to alloc and set pointer. + // Needs to set tf.vSeg, tf.vLen and tf.vAddr. + panic("we can work it out") + } + + // Existing buffer location has space for new text. Copy text over it. + dst := tf.vSeg.slice(tf.vAddr, Size(tf.vLen)) + n := copy(dst, []byte(v)) + + // Pad with zeros (clear leftover). Last byte is already zero. + // + // TODO: replace with clear(dst[n:length-1]) after go1.21. + for i := n; i < int(tf.vLen-1); i++ { + dst[i] = 0 + } + + return nil +} + // SetNewText sets the i'th pointer to a newly allocated text. func (p Struct) SetNewText(i uint16, v string) error { t, err := NewText(p.seg, v) diff --git a/struct_test.go b/struct_test.go index 119e773d..d4fb2c4a 100644 --- a/struct_test.go +++ b/struct_test.go @@ -91,3 +91,41 @@ func BenchmarkSetTextUpdate(b *testing.B) { } } } + +func BenchmarkSetTextAsField(b *testing.B) { + var msg capnp.Message + var arena = &capnp.SimpleSingleSegmentArena{} + arena.ReplaceBuffer(make([]byte, 0, 1024)) + + // NOTE: Needs to be ResetForRead() because Reset() allocates + // the root pointer. This is part of API madness. + msg.ResetForRead(arena) + + a, err := aircraftlib.AllocateNewRootBenchmark(&msg) + if err != nil { + b.Fatal(err) + } + + err = a.SetName("my name") + if err != nil { + b.Fatal(err) + } + + // WHY?!?!?!? + msg.ResetReadLimit(1 << 31) + + nameField, err := a.NameField() + if err != nil { + b.Fatal(err) + } + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + err := nameField.Set("my name") + if err != nil { + b.Fatal(err) + } + } +} From 082a6b9caf8c81d205553155111ae093e729233f Mon Sep 17 00:00:00 2001 From: Matheus Degiovani Date: Thu, 16 May 2024 08:49:20 -0300 Subject: [PATCH 15/19] wip add get textfield --- message.go | 16 +++++++--- my_test.go | 92 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ struct.go | 25 +++++++++++++++ 3 files changed, 128 insertions(+), 5 deletions(-) create mode 100644 my_test.go diff --git a/message.go b/message.go index 01a07831..61fe0bfe 100644 --- a/message.go +++ b/message.go @@ -365,9 +365,9 @@ func (m *Message) WriteTo(w io.Writer) (int64, error) { return wc.N, err } -// Marshal concatenates the segments in the message into a single byte +// MarshalInto concatenates the segments in the message into a single byte // slice including framing. -func (m *Message) Marshal() ([]byte, error) { +func (m *Message) MarshalInto(buf []byte) ([]byte, error) { // Compute buffer size. nsegs := m.NumSegments() if nsegs == 0 { @@ -404,8 +404,10 @@ func (m *Message) Marshal() ([]byte, error) { } // Fill buffer. - buf := make([]byte, int(hdrSize), int(total)) - binary.LittleEndian.PutUint32(buf, uint32(nsegs-1)) + if buf == nil { + buf = make([]byte, 0, int(total)) + } + buf = binary.LittleEndian.AppendUint32(buf, uint32(nsegs-1)) for i := int64(0); i < nsegs; i++ { s, err := m.Segment(SegmentID(i)) if err != nil { @@ -414,12 +416,16 @@ func (m *Message) Marshal() ([]byte, error) { if len(s.data)%int(wordSize) != 0 { return nil, errors.New("marshal: segment " + str.Itod(i) + " not word-aligned") } - binary.LittleEndian.PutUint32(buf[int(i+1)*4:], uint32(len(s.data)/int(wordSize))) + buf = binary.LittleEndian.AppendUint32(buf, uint32(len(s.data)/int(wordSize))) buf = append(buf, s.data...) } return buf, nil } +func (m *Message) Marshal() ([]byte, error) { + return m.MarshalInto(nil) +} + // MarshalPacked marshals the message in packed form. func (m *Message) MarshalPacked() ([]byte, error) { data, err := m.Marshal() diff --git a/my_test.go b/my_test.go new file mode 100644 index 00000000..3d688822 --- /dev/null +++ b/my_test.go @@ -0,0 +1,92 @@ +package capnp_test + +import ( + "testing" + + "capnproto.org/go/capnp/v3" + "capnproto.org/go/capnp/v3/internal/aircraftlib" +) + +func BenchmarkSetText05(b *testing.B) { + var msg capnp.Message + arena := &capnp.SimpleSingleSegmentArena{} + + msg.ResetForRead(arena) + seg, err := msg.Segment(0) + if err != nil { + b.Fatal(err) + } + tx, err := aircraftlib.NewBenchmarkA(seg) + if err != nil { + b.Fatal(err) + } + + err = tx.SetName("my own descr") + if err != nil { + b.Fatal(err) + } + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + msg.ResetForRead(arena) + seg, _ := msg.Segment(0) + + tx, err := aircraftlib.NewBenchmarkA(seg) + if err != nil { + b.Fatal(err) + } + + err = tx.SetName("my own descr") + if err != nil { + b.Fatal(err) + } + } + + // b.Log(arena.String()) + +} + +func BenchmarkSetInt(b *testing.B) { + var msg capnp.Message + arena := &capnp.SimpleSingleSegmentArena{} + + msg.ResetForRead(arena) + seg, err := msg.Segment(0) + if err != nil { + b.Fatal(err) + } + tx, err := aircraftlib.NewBenchmarkA(seg) + if err != nil { + b.Fatal(err) + } + + tx.SetBirthDay(0x20010101) + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + tx.SetBirthDay(0x20010101 + int64(i)) + } + + // b.Log(arena.String()) + +} +func BenchmarkSetTextBaselineCopyClear(b *testing.B) { + var pt *[]byte + buf := make([]byte, 1024) + pt = &buf + + b.ResetTimer() + src := "my own descr" + off := 48 + for i := 0; i < b.N; i++ { + s := (*pt)[off : off+len(src)+1] + n := copy(s, []byte(src)) + for i := n; i < len(s); i++ { + s[i] = 0 + } + } +} diff --git a/struct.go b/struct.go index 331ffdc2..f03d7f0b 100644 --- a/struct.go +++ b/struct.go @@ -1,7 +1,9 @@ package capnp import ( + "bytes" "errors" + "unsafe" "capnproto.org/go/capnp/v3/exc" "capnproto.org/go/capnp/v3/internal/str" @@ -300,6 +302,29 @@ func (tf *TextField) Set(v string) error { return nil } +func trimZero(r rune) bool { + return r == 0 +} + +func (tf *TextField) Get() string { + if tf.vSeg == nil { + panic("not allocated") + } + + if tf.vLen == 0 { + return "" + } + + b := tf.vSeg.slice(tf.vAddr, Size(tf.vLen)) + return string(bytes.TrimRightFunc(b, trimZero)) +} + +func (tf *TextField) GetUnsafe() string { + b := tf.vSeg.slice(tf.vAddr, Size(tf.vLen)) + b = bytes.TrimRightFunc(b, trimZero) + return *(*string)(unsafe.Pointer(&b)) +} + // SetNewText sets the i'th pointer to a newly allocated text. func (p Struct) SetNewText(i uint16, v string) error { t, err := NewText(p.seg, v) From 7c36466fca07d2b01321f6126bc579ff35554542 Mon Sep 17 00:00:00 2001 From: Matheus Degiovani Date: Tue, 4 Jun 2024 18:29:12 -0300 Subject: [PATCH 16/19] wip benchmark --- go.mod | 4 + internal/aircraftlib/experimental.go | 31 +++++++- my_test.go | 105 +++++++++++++++++++++++++++ segment.go | 11 ++- struct.go | 63 +++++++++++++++- 5 files changed, 210 insertions(+), 4 deletions(-) diff --git a/go.mod b/go.mod index fbd8f94d..305eec68 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,8 @@ module capnproto.org/go/capnp/v3 go 1.19 require ( + + github.com/alecthomas/go_serialization_benchmarks/goserbench v0.0.0 github.com/colega/zeropool v0.0.0-20230505084239-6fb4a4f75381 github.com/kylelemons/godebug v1.1.0 github.com/stretchr/testify v1.8.2 @@ -12,6 +14,8 @@ require ( golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 ) +replace github.com/alecthomas/go_serialization_benchmarks/goserbench => /home/user/testes/vendor/alecthomas/go_serialization_benchmarks/goserbench + require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/philhofer/fwd v1.1.1 // indirect diff --git a/internal/aircraftlib/experimental.go b/internal/aircraftlib/experimental.go index 50fa7214..498c8173 100644 --- a/internal/aircraftlib/experimental.go +++ b/internal/aircraftlib/experimental.go @@ -1,6 +1,10 @@ package aircraftlib -import capnp "capnproto.org/go/capnp/v3" +import ( + math "math" + + capnp "capnproto.org/go/capnp/v3" +) // Experimental: This could replace NewRootBenchmarkA without having to modify // the public API. @@ -27,3 +31,28 @@ func (s BenchmarkA) UpdateName(v string) error { func (s BenchmarkA) NameField() (capnp.TextField, error) { return capnp.Struct(s).TextField(0) } + +func (s BenchmarkA) FlatSetPhone(v string) error { + return capnp.Struct(s).FlatSetNewText(1, v) +} + +func (s BenchmarkA) PhoneField() (capnp.TextField, error) { + return capnp.Struct(s).TextField(1) +} + +func (s *BenchmarkA) SetMoneyp(v float64) { + bits := math.Float64bits(v) + (*capnp.Struct)(s).SetUint64p(16, bits) +} + +func (s *BenchmarkA) SetSpousep(v bool) { + (*capnp.Struct)(s).SetBitp(96, v) +} + +func (s *BenchmarkA) SetSiblingsp(v int32) { + (*capnp.Struct)(s).SetUint32p(8, uint32(v)) +} + +func (s *BenchmarkA) SetBirthDayp(v int64) { + (*capnp.Struct)(s).SetUint64p(0, uint64(v)) +} diff --git a/my_test.go b/my_test.go index 3d688822..4e9bb807 100644 --- a/my_test.go +++ b/my_test.go @@ -1,10 +1,13 @@ package capnp_test import ( + "strings" "testing" "capnproto.org/go/capnp/v3" "capnproto.org/go/capnp/v3/internal/aircraftlib" + + "github.com/alecthomas/go_serialization_benchmarks/goserbench" ) func BenchmarkSetText05(b *testing.B) { @@ -90,3 +93,105 @@ func BenchmarkSetTextBaselineCopyClear(b *testing.B) { } } } + +type CapNProtoSerializer struct { + msg capnp.Message + arena *capnp.SimpleSingleSegmentArena + c *aircraftlib.BenchmarkA + + fieldName *capnp.TextField + fieldPhone *capnp.TextField +} + +func (x *CapNProtoSerializer) Marshal(o interface{}) ([]byte, error) { + a := o.(*goserbench.SmallStruct) + /* + x.msg.ResetForRead(x.arena) + seg, err := x.msg.Segment(0) + if err != nil { + return nil, err + } + + c, err := aircraftlib.AllocateNewRootBenchmark(&x.msg) + if err != nil { + return nil, err + } + */ + + c := x.c + c.SetBirthDay(a.BirthDay.UnixNano()) + c.SetSiblingsp(int32(a.Siblings)) + c.SetSpousep(a.Spouse) + c.SetMoneyp(a.Money) // c.SetMoney(a.Money) + x.fieldName.Set(a.Name) // c.FlatSetName(a.Name) // c.SetName(a.Name) + x.fieldPhone.Set(a.Phone) // c.FlatSetPhone(a.Phone) // c.SetPhone(a.Phone) + + return x.arena.Data(0) +} + +func (x *CapNProtoSerializer) Unmarshal(d []byte, i interface{}) error { + /* + a := i.(*goserbench.SmallStruct) + + s, _, err := capn.ReadFromMemoryZeroCopy(d) + if err != nil { + return err + + } + o := aircraftlib.ReadRootBenchmarkA(s) + a.Name = o.Name() + a.BirthDay = time.Unix(0, o.BirthDay()) + a.Phone = o.Phone() + a.Siblings = int(o.Siblings()) + a.Spouse = o.Spouse() + a.Money = o.Money() + return nil + */ + return nil +} + +func NewCapNProtoSerializer() goserbench.Serializer { + arena := &capnp.SimpleSingleSegmentArena{} + var msg capnp.Message + msg.ResetForRead(arena) + msg.ResetReadLimit(1 << 31) + + a, err := aircraftlib.AllocateNewRootBenchmark(&msg) + if err != nil { + panic(err) + } + if err := a.SetName(strings.Repeat("a", goserbench.MaxSmallStructNameSize)); err != nil { + panic(err) + } + if err := a.SetPhone(strings.Repeat("a", goserbench.MaxSmallStructNameSize)); err != nil { + panic(err) + } + + fieldName, err := a.NameField() + if err != nil { + panic(err) + } + fieldPhone, err := a.PhoneField() + if err != nil { + panic(err) + } + + return &CapNProtoSerializer{ + arena: arena, + c: &a, + fieldName: &fieldName, + fieldPhone: &fieldPhone, + } + +} + +func BenchmarkGoserBench(b *testing.B) { + b.Run("marshal", func(b *testing.B) { + goserbench.BenchMarshalSmallStruct(b, NewCapNProtoSerializer()) + }) + + b.Run("unmarshal", func(b *testing.B) { + goserbench.BenchUnmarshalSmallStruct(b, NewCapNProtoSerializer(), false) + }) + +} diff --git a/segment.go b/segment.go index 1d704c72..c5226020 100644 --- a/segment.go +++ b/segment.go @@ -76,7 +76,16 @@ func (s *Segment) readRawPointer(addr address) rawPointer { } func (s *Segment) writeUint8(addr address, val uint8) { - s.slice(addr, 1)[0] = val + // s.slice(addr, 1)[0] = val + s.data[addr] = val +} + +func (s *Segment) setBit(addr address, off uint8) { + s.data[addr] |= 1 << off +} + +func (s *Segment) clearBit(addr address, off uint8) { + s.data[addr] &^= 1 << off } func (s *Segment) writeUint16(addr address, val uint16) { diff --git a/struct.go b/struct.go index f03d7f0b..200d1772 100644 --- a/struct.go +++ b/struct.go @@ -2,6 +2,7 @@ package capnp import ( "bytes" + "encoding/binary" "errors" "unsafe" @@ -366,7 +367,7 @@ func (p Struct) pointerAddress(i uint16) address { } // bitInData reports whether bit is inside p's data section. -func (p Struct) bitInData(bit BitOffset) bool { +func (p *Struct) bitInData(bit BitOffset) bool { return p.seg != nil && bit < BitOffset(p.size.DataSize*8) } @@ -394,7 +395,21 @@ func (p Struct) SetBit(n BitOffset, v bool) { p.seg.writeUint8(addr, b) } -func (p Struct) dataAddress(off DataOffset, sz Size) (addr address, ok bool) { +func (p *Struct) SetBitp(n BitOffset, v bool) { + if !p.bitInData(n) { + panic("capnp: set field outside struct boundaries") + } + addr := p.off.addOffset(n.offset()) + b := p.seg.readUint8(addr) + if v { + b |= n.mask() + } else { + b &^= n.mask() + } + p.seg.writeUint8(addr, b) +} + +func (p *Struct) dataAddress(off DataOffset, sz Size) (addr address, ok bool) { if p.seg == nil || Size(off)+sz > p.size.DataSize { return 0, false } @@ -464,6 +479,14 @@ func (p Struct) SetUint32(off DataOffset, v uint32) { p.seg.writeUint32(addr, v) } +func (p *Struct) SetUint32p(off DataOffset, v uint32) { + addr, ok := p.dataAddress(off, 4) + if !ok { + panic("capnp: set field outside struct boundaries") + } + p.seg.writeUint32(addr, v) +} + // SetUint64 sets the 64-bit integer that is off bytes from the start of the struct to v. func (p Struct) SetUint64(off DataOffset, v uint64) { addr, ok := p.dataAddress(off, 8) @@ -473,6 +496,42 @@ func (p Struct) SetUint64(off DataOffset, v uint64) { p.seg.writeUint64(addr, v) } +func (p *Struct) SetUint64p(off DataOffset, v uint64) { + addr, ok := p.dataAddress(off, 8) + if !ok { + panic("capnp: set field outside struct boundaries") + } + + // p.seg.writeUint64(addr, v) + b := p.seg.slice(addr, 8) + binary.LittleEndian.PutUint64(b, v) + + /* + b := p.seg.slice(addr, 8) + b[0] = byte(v) + b[1] = byte(v >> 8) + b[2] = byte(v >> 16) + b[3] = byte(v >> 24) + b[4] = byte(v >> 32) + b[5] = byte(v >> 40) + b[6] = byte(v >> 48) + b[7] = byte(v >> 56) + */ + /* + b := p.seg.data + _ = b[addr+7] + b[addr] = byte(v) + b[addr+1] = byte(v >> 8) + b[addr+2] = byte(v >> 16) + b[addr+3] = byte(v >> 24) + b[addr+4] = byte(v >> 32) + b[addr+5] = byte(v >> 40) + b[addr+6] = byte(v >> 48) + b[addr+7] = byte(v >> 56) + */ + +} + // structFlags is a bitmask of flags for a pointer. type structFlags uint8 From b1c4a4e2dbaaddfa253ee48b870489e9b28bbea4 Mon Sep 17 00:00:00 2001 From: Matheus Degiovani Date: Mon, 17 Jun 2024 15:54:15 -0300 Subject: [PATCH 17/19] more wip --- go.mod | 3 +- go.sum | 2 + internal/aircraftlib/experimental.go | 35 ++- message.go | 2 + my_test.go | 309 +++++++++++++++++++++++++-- pointer.go | 14 ++ rawpointer.go | 4 + segment.go | 6 + struct.go | 225 ++++++++++++++++++- 9 files changed, 571 insertions(+), 29 deletions(-) diff --git a/go.mod b/go.mod index 305eec68..682b37d0 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,6 @@ module capnproto.org/go/capnp/v3 go 1.19 require ( - github.com/alecthomas/go_serialization_benchmarks/goserbench v0.0.0 github.com/colega/zeropool v0.0.0-20230505084239-6fb4a4f75381 github.com/kylelemons/godebug v1.1.0 @@ -14,7 +13,7 @@ require ( golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 ) -replace github.com/alecthomas/go_serialization_benchmarks/goserbench => /home/user/testes/vendor/alecthomas/go_serialization_benchmarks/goserbench +replace github.com/alecthomas/go_serialization_benchmarks/goserbench => github.com/matheusd/go_serialization_benchmarks/goserbench v0.1.0 require ( github.com/davecgh/go-spew v1.1.1 // indirect diff --git a/go.sum b/go.sum index 1076665e..463bde78 100644 --- a/go.sum +++ b/go.sum @@ -5,6 +5,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= +github.com/matheusd/go_serialization_benchmarks/goserbench v0.1.0 h1:P9R5TzTvWtYIcDTAhcxYOZ9K7L01ZjoGtN/fGBv44+w= +github.com/matheusd/go_serialization_benchmarks/goserbench v0.1.0/go.mod h1:ggzY78sMTNVN7i7GWrQbI8lM37KVujasWcBjZL1Esjg= github.com/philhofer/fwd v1.1.1 h1:GdGcTjf5RNAxwS4QLsiMzJYj5KEvPJD3Abr261yRQXQ= github.com/philhofer/fwd v1.1.1/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= diff --git a/internal/aircraftlib/experimental.go b/internal/aircraftlib/experimental.go index 498c8173..2fbf3c01 100644 --- a/internal/aircraftlib/experimental.go +++ b/internal/aircraftlib/experimental.go @@ -36,13 +36,28 @@ func (s BenchmarkA) FlatSetPhone(v string) error { return capnp.Struct(s).FlatSetNewText(1, v) } +func (s *BenchmarkA) GetName() (string, error) { + return (*capnp.Struct)(s).GetTextUnsafe(0) +} + +func (s *BenchmarkA) GetNameSuperUnsafe() (string, error) { + return (*capnp.Struct)(s).GetTextSuperUnsafe(0) +} + +func (s *BenchmarkA) GetPhone() (string, error) { + return (*capnp.Struct)(s).GetTextUnsafe(1) +} + +func (s *BenchmarkA) GetPhoneSuperUnsafe() (string, error) { + return (*capnp.Struct)(s).GetTextSuperUnsafe(1) +} + func (s BenchmarkA) PhoneField() (capnp.TextField, error) { return capnp.Struct(s).TextField(1) } func (s *BenchmarkA) SetMoneyp(v float64) { - bits := math.Float64bits(v) - (*capnp.Struct)(s).SetUint64p(16, bits) + (*capnp.Struct)(s).SetFloat64p(16, v) } func (s *BenchmarkA) SetSpousep(v bool) { @@ -53,6 +68,22 @@ func (s *BenchmarkA) SetSiblingsp(v int32) { (*capnp.Struct)(s).SetUint32p(8, uint32(v)) } +func (s *BenchmarkA) GetSiblings() int32 { + return int32((*capnp.Struct)(s).Uint32p(8)) +} + func (s *BenchmarkA) SetBirthDayp(v int64) { (*capnp.Struct)(s).SetUint64p(0, uint64(v)) } + +func (s *BenchmarkA) GetBirthDay() int64 { + return int64((*capnp.Struct)(s).Uint64p(0)) +} + +func (s *BenchmarkA) GetMoney() float64 { + return math.Float64frombits((*capnp.Struct)(s).Uint64p(16)) +} + +func (s *BenchmarkA) GetSpouse() bool { + return (*capnp.Struct)(s).Bitp(96) +} diff --git a/message.go b/message.go index 61fe0bfe..0581059d 100644 --- a/message.go +++ b/message.go @@ -181,6 +181,8 @@ func (m *Message) initReadLimit() { // canRead reports whether the amount of bytes can be stored safely. func (m *Message) canRead(sz Size) (ok bool) { + return true + m.rlimitInit.Do(m.initReadLimit) for { curr := m.rlimit.Load() diff --git a/my_test.go b/my_test.go index 4e9bb807..94f6d0a7 100644 --- a/my_test.go +++ b/my_test.go @@ -1,8 +1,13 @@ package capnp_test import ( + "encoding/binary" + "encoding/hex" + "math" "strings" "testing" + "time" + "unsafe" "capnproto.org/go/capnp/v3" "capnproto.org/go/capnp/v3/internal/aircraftlib" @@ -119,7 +124,7 @@ func (x *CapNProtoSerializer) Marshal(o interface{}) ([]byte, error) { */ c := x.c - c.SetBirthDay(a.BirthDay.UnixNano()) + c.SetBirthDayp(a.BirthDay.UnixNano()) c.SetSiblingsp(int32(a.Siblings)) c.SetSpousep(a.Spouse) c.SetMoneyp(a.Money) // c.SetMoney(a.Money) @@ -130,23 +135,25 @@ func (x *CapNProtoSerializer) Marshal(o interface{}) ([]byte, error) { } func (x *CapNProtoSerializer) Unmarshal(d []byte, i interface{}) error { - /* - a := i.(*goserbench.SmallStruct) + a := i.(*goserbench.SmallStruct) - s, _, err := capn.ReadFromMemoryZeroCopy(d) - if err != nil { - return err + x.arena.ReplaceBuffer(d) - } - o := aircraftlib.ReadRootBenchmarkA(s) - a.Name = o.Name() - a.BirthDay = time.Unix(0, o.BirthDay()) - a.Phone = o.Phone() - a.Siblings = int(o.Siblings()) - a.Spouse = o.Spouse() - a.Money = o.Money() - return nil - */ + var err error + + c := x.c + a.Name, err = c.GetNameSuperUnsafe() //c.GetName() // c.Name() + if err != nil { + return err + } + a.BirthDay = time.Unix(0, c.GetBirthDay()) + a.Phone, err = c.GetPhoneSuperUnsafe() // c.GetPhone() // c.Phone() + if err != nil { + return err + } + a.Siblings = int(c.GetSiblings()) + a.Spouse = c.GetSpouse() + a.Money = c.GetMoney() return nil } @@ -163,7 +170,7 @@ func NewCapNProtoSerializer() goserbench.Serializer { if err := a.SetName(strings.Repeat("a", goserbench.MaxSmallStructNameSize)); err != nil { panic(err) } - if err := a.SetPhone(strings.Repeat("a", goserbench.MaxSmallStructNameSize)); err != nil { + if err := a.SetPhone(strings.Repeat("a", goserbench.MaxSmallStructPhoneSize)); err != nil { panic(err) } @@ -193,5 +200,273 @@ func BenchmarkGoserBench(b *testing.B) { b.Run("unmarshal", func(b *testing.B) { goserbench.BenchUnmarshalSmallStruct(b, NewCapNProtoSerializer(), false) }) +} + +func BenchmarkSetUint(b *testing.B) { + b.Run("baseline", func(b *testing.B) { + buf := make([]byte, 1024) + b.ResetTimer() + for i := 0; i < b.N; i++ { + binary.LittleEndian.PutUint64(buf[48:], uint64(i)) + } + }) + b.Run("unsafe", func(b *testing.B) { + buf := make([]byte, 1024) + b.ResetTimer() + for i := 0; i < b.N; i++ { + *(*uint64)(unsafe.Pointer(&buf[48])) = uint64(i) + } + + }) + b.Run("capnp", func(b *testing.B) { + arena := &capnp.SimpleSingleSegmentArena{} + var msg capnp.Message + msg.ResetForRead(arena) + msg.ResetReadLimit(1 << 31) + + a, err := aircraftlib.AllocateNewRootBenchmark(&msg) + if err != nil { + panic(err) + } + if err := a.SetName(strings.Repeat("a", goserbench.MaxSmallStructNameSize)); err != nil { + panic(err) + } + if err := a.SetPhone(strings.Repeat("a", goserbench.MaxSmallStructNameSize)); err != nil { + panic(err) + } + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + a.SetBirthDayp(int64(i)) + } + }) +} + +func BenchmarkSetFloat(b *testing.B) { + arena := &capnp.SimpleSingleSegmentArena{} + var msg capnp.Message + msg.ResetForRead(arena) + msg.ResetReadLimit(1 << 31) + + a, err := aircraftlib.AllocateNewRootBenchmark(&msg) + if err != nil { + panic(err) + } + if err := a.SetName(strings.Repeat("a", goserbench.MaxSmallStructNameSize)); err != nil { + panic(err) + } + if err := a.SetPhone(strings.Repeat("a", goserbench.MaxSmallStructNameSize)); err != nil { + panic(err) + } + + b.Run("baseline", func(b *testing.B) { + buf := make([]byte, 1024) + var f float64 + b.ResetTimer() + for i := 0; i < b.N; i++ { + f += float64(i) + binary.LittleEndian.PutUint64(buf[48:], math.Float64bits(f)) + } + + _ = hex.EncodeToString(buf) + }) + b.Run("capnp", func(b *testing.B) { + b.ResetTimer() + + var f float64 + for i := 0; i < b.N; i++ { + f += float64(i) + a.SetMoneyp(f) + } + + _ = hex.EncodeToString(arena.Segment(0).Data()) + }) + b.Run("uint", func(b *testing.B) { + b.ResetTimer() + + var f float64 + for i := 0; i < b.N; i++ { + f += float64(i) + a.SetBirthDayp(int64(math.Float64bits(f))) + } + + _ = hex.EncodeToString(arena.Segment(0).Data()) + }) + +} + +func BenchmarkSetBool(b *testing.B) { + buf := make([]byte, 1024) + b.Run("readandmask", func(b *testing.B) { + var bl bool + for i := 0; i < b.N; i++ { + //addr, off := i%len(buf), i%8 + addr, off := 12, 0 + + bl = !bl + aux := buf[addr] + if bl { + aux |= (1 << off) + } else { + aux &^= (1 << off) + } + buf[addr] = aux + } + }) + b.Run("directly", func(b *testing.B) { + var bl bool + for i := 0; i < b.N; i++ { + addr, off := i%len(buf), i%8 + + bl = !bl + if bl { + buf[addr] |= (1 << off) + } else { + buf[addr] &^= (1 << off) + } + } + }) + b.Run("xor", func(b *testing.B) { + var bl bool + for i := 0; i < b.N; i++ { + addr, off := i%len(buf), i%8 + + bl = !bl + + aux := byte(1 << off) + if bl { + aux ^= 0xff + } + buf[addr] ^= aux + } + }) + + arena := &capnp.SimpleSingleSegmentArena{} + var msg capnp.Message + msg.ResetForRead(arena) + msg.ResetReadLimit(1 << 31) + + a, err := aircraftlib.AllocateNewRootBenchmark(&msg) + if err != nil { + panic(err) + } + if err := a.SetName(strings.Repeat("a", goserbench.MaxSmallStructNameSize)); err != nil { + panic(err) + } + if err := a.SetPhone(strings.Repeat("a", goserbench.MaxSmallStructNameSize)); err != nil { + panic(err) + } + + b.Run("capnp", func(b *testing.B) { + var bl bool + for i := 0; i < b.N; i++ { + bl = !bl + a.SetSpousep(bl) + } + }) + +} + +func sliceAt32(b []byte) []byte { + return b[32:] +} + +func BenchmarkGetUint64(b *testing.B) { + buf := make([]byte, 1024) + const target = int64(0x17010666) + binary.LittleEndian.PutUint64(buf[32:], uint64(target)) + b.Run("baseline", func(b *testing.B) { + for i := 0; i < b.N; i++ { + v := binary.LittleEndian.Uint64(buf[32:]) + if int64(v) != target { + panic("boo") + } + } + }) + + b.Run("func", func(b *testing.B) { + for i := 0; i < b.N; i++ { + v := binary.LittleEndian.Uint64(sliceAt32(buf)) + if int64(v) != target { + panic("boo") + } + } + }) + + arena := &capnp.SimpleSingleSegmentArena{} + var msg capnp.Message + msg.ResetForRead(arena) + msg.ResetReadLimit(1 << 31) + + a, err := aircraftlib.AllocateNewRootBenchmark(&msg) + if err != nil { + panic(err) + } + + a.SetBirthDay(int64(target)) + + b.Run("capnp", func(b *testing.B) { + for i := 0; i < b.N; i++ { + v := a.GetBirthDay() + if v != target { + panic("boo") + } + } + }) +} + +func BenchmarkGetText(b *testing.B) { + const target = "i am the walrus!" + buf := make([]byte, 1024) + copy(buf[32:], target) + b.Run("baseline", func(b *testing.B) { + for i := 0; i < b.N; i++ { + tb := buf[32 : 32+16] + v := *(*string)(unsafe.Pointer(&tb)) + if v != target { + b.Fatalf("v %q", v) + } + } + }) + + arena := &capnp.SimpleSingleSegmentArena{} + var msg capnp.Message + msg.ResetForRead(arena) + msg.ResetReadLimit(1 << 31) + + a, err := aircraftlib.AllocateNewRootBenchmark(&msg) + if err != nil { + panic(err) + } + + err = a.SetName(target) + if err != nil { + b.Fatal(err) + } + + b.Run("capnp", func(b *testing.B) { + for i := 0; i < b.N; i++ { + v, err := a.GetName() + if err != nil { + b.Fatal(err) + } + if v != target { + b.Fatalf("v %q", v) + } + } + }) + + b.Run("superunsafe", func(b *testing.B) { + for i := 0; i < b.N; i++ { + v, err := a.GetNameSuperUnsafe() + if err != nil { + b.Fatal(err) + } + if v != target { + b.Fatalf("v %q", v) + } + } + }) } diff --git a/pointer.go b/pointer.go index d5b50483..ad0b4e65 100644 --- a/pointer.go +++ b/pointer.go @@ -150,6 +150,20 @@ func (p Ptr) text() (b []byte, ok bool) { return b[: len(b)-1 : len(b)], true } +func (p *Ptr) textp() (b []byte, ok bool) { + // if !isOneByteList(p) { + if !(p.seg != nil && p.flags.ptrType() == listPtrType && p.size.isOneByte() && p.flags.listFlags()&isCompositeList == 0) { + return nil, false + } + + b = p.seg.slice(p.off, Size(p.lenOrCap)) + if len(b) == 0 || b[len(b)-1] != 0 { + // Text must be null-terminated. + return nil, false + } + return b[: len(b)-1 : len(b)], true +} + // Data attempts to convert p into Data, returning nil if p is not a // valid 1-byte list pointer. func (p Ptr) Data() []byte { diff --git a/rawpointer.go b/rawpointer.go index 2a5aeb61..472494db 100644 --- a/rawpointer.go +++ b/rawpointer.go @@ -15,6 +15,10 @@ func (off pointerOffset) resolve(base address) (_ address, ok bool) { return base.element(int32(off), wordSize) } +func (off pointerOffset) resolveUnsafe(base address) address { + return base + address(off)*address(wordSize) +} + // nearPointerOffset computes the offset for a pointer at paddr to point to addr. func nearPointerOffset(paddr, addr address) pointerOffset { return pointerOffset(addr/address(wordSize) - paddr/address(wordSize) - 1) diff --git a/segment.go b/segment.go index c5226020..6b81ac0d 100644 --- a/segment.go +++ b/segment.go @@ -3,6 +3,7 @@ package capnp import ( "encoding/binary" "errors" + "math" "capnproto.org/go/capnp/v3/exc" "capnproto.org/go/capnp/v3/internal/str" @@ -98,8 +99,13 @@ func (s *Segment) writeUint32(addr address, val uint32) { func (s *Segment) writeUint64(addr address, val uint64) { binary.LittleEndian.PutUint64(s.slice(addr, 8), val) + // binary.LittleEndian.AppendUint64(s.data[addr:addr], val) } +func (s *Segment) writeFloat64(addr address, val float64) { + binary.LittleEndian.PutUint64(s.slice(addr, 8), math.Float64bits(val)) + // binary.LittleEndian.AppendUint64(s.data[addr:addr], math.Float64bits(val)) +} func (s *Segment) writeRawPointer(addr address, val rawPointer) { s.writeUint64(addr, uint64(val)) } diff --git a/struct.go b/struct.go index 200d1772..dd722066 100644 --- a/struct.go +++ b/struct.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/binary" "errors" + "fmt" "unsafe" "capnproto.org/go/capnp/v3/exc" @@ -163,6 +164,163 @@ func (p Struct) SetText(i uint16, v string) error { return p.SetNewText(i, v) } +func (p *Struct) GetText(i uint16) (string, error) { + // p.Ptr(i) + if p.seg == nil || i >= p.size.PointerCount { + return "", nil + } + + // p.pointerAddress(i) + ptrStart, _ := p.off.addSize(p.size.DataSize) + addr, _ := ptrStart.element(int32(i), wordSize) + + ptr, err := p.seg.readPtr(addr, p.depthLimit) + if err != nil { + return "", err + } + + // return ptr.Text(), nil + + tb, ok := ptr.textp() + if !ok { + return "", nil + } + + return string(tb), nil +} + +func (p *Struct) GetTextUnsafe(i uint16) (string, error) { + // p.Ptr(i) + if p.seg == nil || i >= p.size.PointerCount { + return "", nil + } + + // p.pointerAddress(i) + ptrStart, _ := p.off.addSize(p.size.DataSize) // Start of pointers in struct (could be cached) + addr, _ := ptrStart.element(int32(i), wordSize) // Address of ith pointer + + // ptr, err := p.seg.readPtr(addr, p.depthLimit) + s, base, val, err := p.seg.resolveFarPointer(addr) // Read pointer (panics if out of bounds, but validated by initial check) + + if err != nil { + return "", exc.WrapError("read pointer", err) + } + if val == 0 { + return "", nil + } + if p.depthLimit == 0 { + return "", errors.New("read pointer: depth limit reached") + } + if val.pointerType() != listPointer { + return "", fmt.Errorf("not a list pointer") + } + /* + lp, err := s.readListPtr(base, val) + if err != nil { + return "", exc.WrapError("read pointer", err) + } + */ + if val.listType() != byte1List { + return "", fmt.Errorf("not a byte1list") + } + laddr, ok := val.offset().resolve(base) + if !ok { + return "", errors.New("list pointer: invalid address") + } + /* + if !s.msg.canRead(lp.readSize()) { + return "", errors.New("read pointer: read traversal limit reached") + } + lp.depthLimit = p.depthLimit - 1 + */ + // return ptr.Text(), nil + + // tb, ok := ptr.textp() + tb := s.slice(laddr, Size(val.numListElements())) + end := len(tb) - 1 + trimmed := false + for ; end >= 0 && tb[end] == 0; end-- { + trimmed = true + } + if !trimmed { + // Not null terminated. + return "", nil + } + tb = tb[:end+1] + + return *(*string)(unsafe.Pointer(&tb)), nil +} + +func (p *Struct) GetTextSuperUnsafe(i uint16) (string, error) { + // p.Ptr(i) + // Bounds check pointer address within range. + if p.seg == nil || i >= p.size.PointerCount { + return "", nil + } + + // p.pointerAddress(i) + ptrStart := p.off + address(p.size.DataSize) + addr := ptrStart + address(i)*address(wordSize) + + // ptr, err := p.seg.readPtr(addr, p.depthLimit) + s, base, val, _ := p.seg.resolveFarPointer(addr) // Read pointer (panics if out of bounds, but validated by initial check) + /* + if err != nil { + return "", exc.WrapError("read pointer", err) + } + */ + if val == 0 { + return "", nil + } + /* + if p.depthLimit == 0 { + return "", errors.New("read pointer: depth limit reached") + } + if val.pointerType() != listPointer { + return "", fmt.Errorf("not a list pointer") + } + /* + lp, err := s.readListPtr(base, val) + if err != nil { + return "", exc.WrapError("read pointer", err) + } + */ + /* + if val.listType() != byte1List { + return "", fmt.Errorf("not a byte1list") + } + */ + laddr := val.offset().resolveUnsafe(base) + /* + laddr, ok := val.offset().resolve(base) + if !ok { + return "", errors.New("list pointer: invalid address") + } + */ + /* + if !s.msg.canRead(lp.readSize()) { + return "", errors.New("read pointer: read traversal limit reached") + } + lp.depthLimit = p.depthLimit - 1 + */ + // return ptr.Text(), nil + + // tb, ok := ptr.textp() + tb := s.slice(laddr, Size(val.numListElements())) + end := len(tb) - 1 + trimmed := false + for ; end >= 0 && tb[end] == 0; end-- { + trimmed = true + } + if !trimmed { + // Not null terminated. + return "", nil + } + tb = tb[:end+1] + + return *(*string)(unsafe.Pointer(&tb)), nil +} + func (p Struct) FlatSetNewText(i uint16, v string) error { // NewText().newPrimitiveList sz := Size(1) @@ -380,6 +538,11 @@ func (p Struct) Bit(n BitOffset) bool { return p.seg.readUint8(addr)&n.mask() != 0 } +func (p *Struct) Bitp(n BitOffset) bool { + addr := p.dataAddressUnchecked(n.offset()) + return p.seg.readUint8(addr)&n.mask() != 0 +} + // SetBit sets the bit that is n bits from the start of the struct to v. func (p Struct) SetBit(n BitOffset, v bool) { if !p.bitInData(n) { @@ -396,17 +559,25 @@ func (p Struct) SetBit(n BitOffset, v bool) { } func (p *Struct) SetBitp(n BitOffset, v bool) { - if !p.bitInData(n) { - panic("capnp: set field outside struct boundaries") - } - addr := p.off.addOffset(n.offset()) - b := p.seg.readUint8(addr) + addr := p.dataAddressUnchecked(n.offset()) if v { - b |= n.mask() + p.seg.setBit(addr, uint8(n%8)) } else { - b &^= n.mask() + p.seg.clearBit(addr, uint8(n%8)) } - p.seg.writeUint8(addr, b) + /* + if !p.bitInData(n) { + panic("capnp: set field outside struct boundaries") + } + addr := p.off.addOffset(n.offset()) + b := p.seg.readUint8(addr) + if v { + b |= n.mask() + } else { + b &^= n.mask() + } + p.seg.writeUint8(addr, b) + */ } func (p *Struct) dataAddress(off DataOffset, sz Size) (addr address, ok bool) { @@ -414,6 +585,11 @@ func (p *Struct) dataAddress(off DataOffset, sz Size) (addr address, ok bool) { return 0, false } return p.off.addOffset(off), true + // return p.off + address(off), true +} + +func (p *Struct) dataAddressUnchecked(off DataOffset) (addr address) { + return p.off + address(off) } // Uint8 returns an 8-bit integer from the struct's data section. @@ -443,6 +619,15 @@ func (p Struct) Uint32(off DataOffset) uint32 { return p.seg.readUint32(addr) } +// Uint32p returns a 32-bit integer from the struct's data section. +func (p *Struct) Uint32p(off DataOffset) uint32 { + addr, ok := p.dataAddress(off, 4) + if !ok { + return 0 + } + return p.seg.readUint32(addr) +} + // Uint64 returns a 64-bit integer from the struct's data section. func (p Struct) Uint64(off DataOffset) uint64 { addr, ok := p.dataAddress(off, 8) @@ -452,6 +637,18 @@ func (p Struct) Uint64(off DataOffset) uint64 { return p.seg.readUint64(addr) } +func (p *Struct) Uint64p(off DataOffset) uint64 { + + /* + addr, ok := p.dataAddress(off, 8) + if !ok { + return 0 + } + */ + addr := p.dataAddressUnchecked(off) + return p.seg.readUint64(addr) +} + // SetUint8 sets the 8-bit integer that is off bytes from the start of the struct to v. func (p Struct) SetUint8(off DataOffset, v uint8) { addr, ok := p.dataAddress(off, 1) @@ -532,6 +729,18 @@ func (p *Struct) SetUint64p(off DataOffset, v uint64) { } +func (p *Struct) SetFloat64p(off DataOffset, v float64) { + /* + addr, ok := p.dataAddress(off, 8) + if !ok { + panic("capnp: set field outside struct boundaries") + } + */ + addr := p.dataAddressUnchecked(off) + + p.seg.writeFloat64(addr, v) +} + // structFlags is a bitmask of flags for a pointer. type structFlags uint8 From d8b39cc19dd3d8a63de8ef3be718bfa1b359a092 Mon Sep 17 00:00:00 2001 From: Matheus Degiovani Date: Tue, 18 Jun 2024 15:39:50 -0300 Subject: [PATCH 18/19] more bench wip --- internal/aircraftlib/experimental.go | 8 +- my_test.go | 140 ++++++++++++++++++++++++++- struct.go | 2 +- 3 files changed, 140 insertions(+), 10 deletions(-) diff --git a/internal/aircraftlib/experimental.go b/internal/aircraftlib/experimental.go index 2fbf3c01..7190fa43 100644 --- a/internal/aircraftlib/experimental.go +++ b/internal/aircraftlib/experimental.go @@ -18,8 +18,8 @@ func AllocateNewRootBenchmark(msg *capnp.Message) (BenchmarkA, error) { // // If the unrolled version is deemed good enough, it would just be replaced // inside SetNewText, without having to alter the public API. -func (s BenchmarkA) FlatSetName(v string) error { - return capnp.Struct(s).FlatSetNewText(0, v) +func (s *BenchmarkA) FlatSetName(v string) error { + return (*capnp.Struct)(s).FlatSetNewText(0, v) } // Experimental: update the set in-place. @@ -32,8 +32,8 @@ func (s BenchmarkA) NameField() (capnp.TextField, error) { return capnp.Struct(s).TextField(0) } -func (s BenchmarkA) FlatSetPhone(v string) error { - return capnp.Struct(s).FlatSetNewText(1, v) +func (s *BenchmarkA) FlatSetPhone(v string) error { + return (*capnp.Struct)(s).FlatSetNewText(1, v) } func (s *BenchmarkA) GetName() (string, error) { diff --git a/my_test.go b/my_test.go index 94f6d0a7..594d3b39 100644 --- a/my_test.go +++ b/my_test.go @@ -99,7 +99,7 @@ func BenchmarkSetTextBaselineCopyClear(b *testing.B) { } } -type CapNProtoSerializer struct { +type CapNProtoUnsafeSerializer struct { msg capnp.Message arena *capnp.SimpleSingleSegmentArena c *aircraftlib.BenchmarkA @@ -108,7 +108,7 @@ type CapNProtoSerializer struct { fieldPhone *capnp.TextField } -func (x *CapNProtoSerializer) Marshal(o interface{}) ([]byte, error) { +func (x *CapNProtoUnsafeSerializer) Marshal(o interface{}) ([]byte, error) { a := o.(*goserbench.SmallStruct) /* x.msg.ResetForRead(x.arena) @@ -134,7 +134,7 @@ func (x *CapNProtoSerializer) Marshal(o interface{}) ([]byte, error) { return x.arena.Data(0) } -func (x *CapNProtoSerializer) Unmarshal(d []byte, i interface{}) error { +func (x *CapNProtoUnsafeSerializer) Unmarshal(d []byte, i interface{}) error { a := i.(*goserbench.SmallStruct) x.arena.ReplaceBuffer(d) @@ -157,7 +157,7 @@ func (x *CapNProtoSerializer) Unmarshal(d []byte, i interface{}) error { return nil } -func NewCapNProtoSerializer() goserbench.Serializer { +func NewCapNProtoUnsafeSerializer() goserbench.Serializer { arena := &capnp.SimpleSingleSegmentArena{} var msg capnp.Message msg.ResetForRead(arena) @@ -183,7 +183,7 @@ func NewCapNProtoSerializer() goserbench.Serializer { panic(err) } - return &CapNProtoSerializer{ + return &CapNProtoUnsafeSerializer{ arena: arena, c: &a, fieldName: &fieldName, @@ -192,6 +192,71 @@ func NewCapNProtoSerializer() goserbench.Serializer { } +func BenchmarkGoserBenchUnsafe(b *testing.B) { + b.Run("marshal", func(b *testing.B) { + goserbench.BenchMarshalSmallStruct(b, NewCapNProtoUnsafeSerializer()) + }) + + b.Run("unmarshal", func(b *testing.B) { + goserbench.BenchUnmarshalSmallStruct(b, NewCapNProtoUnsafeSerializer(), false) + }) +} + +type CapNProtoSerializer struct { +} + +func (x *CapNProtoSerializer) Marshal(o interface{}) ([]byte, error) { + msg, seg := capnp.NewSingleSegmentMessage(nil) + + // c, err := aircraftlib.AllocateNewRootBenchmark(msg) + c, err := aircraftlib.NewRootBenchmarkA(seg) + if err != nil { + return nil, err + } + + a := o.(*goserbench.SmallStruct) + c.SetBirthDayp(a.BirthDay.UnixNano()) + c.SetSiblingsp(int32(a.Siblings)) + c.SetSpousep(a.Spouse) + c.SetMoneyp(a.Money) // c.SetMoney(a.Money) + c.FlatSetName(a.Name) // c.SetName(a.Name) + c.FlatSetPhone(a.Phone) // c.SetPhone(a.Phone) + + return msg.Marshal() +} + +func (x *CapNProtoSerializer) Unmarshal(d []byte, i interface{}) error { + a := i.(*goserbench.SmallStruct) + + msg, err := capnp.Unmarshal(d) + if err != nil { + return err + } + + c, err := aircraftlib.ReadRootBenchmarkA(msg) + if err != nil { + return err + } + + a.Name, err = c.Name() + if err != nil { + return err + } + a.BirthDay = time.Unix(0, c.GetBirthDay()) + a.Phone, err = c.Phone() + if err != nil { + return err + } + a.Siblings = int(c.GetSiblings()) + a.Spouse = c.GetSpouse() + a.Money = c.GetMoney() + return nil +} + +func NewCapNProtoSerializer() goserbench.Serializer { + return &CapNProtoSerializer{} +} + func BenchmarkGoserBench(b *testing.B) { b.Run("marshal", func(b *testing.B) { goserbench.BenchMarshalSmallStruct(b, NewCapNProtoSerializer()) @@ -202,6 +267,71 @@ func BenchmarkGoserBench(b *testing.B) { }) } +type CapNProtoImprovSerializer struct { +} + +func (x *CapNProtoImprovSerializer) Marshal(o interface{}) ([]byte, error) { + msg, seg := capnp.NewSingleSegmentMessage(nil) + + // c, err := aircraftlib.AllocateNewRootBenchmark(msg) + c, err := aircraftlib.NewRootBenchmarkA(seg) + if err != nil { + return nil, err + } + + a := o.(*goserbench.SmallStruct) + c.SetBirthDayp(a.BirthDay.UnixNano()) + c.SetSiblingsp(int32(a.Siblings)) + c.SetSpousep(a.Spouse) + c.SetMoneyp(a.Money) // c.SetMoney(a.Money) + c.FlatSetName(a.Name) // c.SetName(a.Name) + c.FlatSetPhone(a.Phone) // c.SetPhone(a.Phone) + + return msg.Marshal() +} + +func (x *CapNProtoImprovSerializer) Unmarshal(d []byte, i interface{}) error { + a := i.(*goserbench.SmallStruct) + + msg, err := capnp.Unmarshal(d) + if err != nil { + return err + } + + c, err := aircraftlib.ReadRootBenchmarkA(msg) + if err != nil { + return err + } + + a.Name, err = c.Name() + if err != nil { + return err + } + a.BirthDay = time.Unix(0, c.GetBirthDay()) + a.Phone, err = c.Phone() + if err != nil { + return err + } + a.Siblings = int(c.GetSiblings()) + a.Spouse = c.GetSpouse() + a.Money = c.GetMoney() + return nil +} + +func NewCapNProtoImprovSerializer() goserbench.Serializer { + return &CapNProtoSerializer{} +} + +func BenchmarkGoserBenchImprov(b *testing.B) { + b.Run("marshal", func(b *testing.B) { + goserbench.BenchMarshalSmallStruct(b, NewCapNProtoImprovSerializer()) + }) + + b.Run("unmarshal", func(b *testing.B) { + goserbench.BenchUnmarshalSmallStruct(b, NewCapNProtoImprovSerializer(), false) + }) +} + func BenchmarkSetUint(b *testing.B) { b.Run("baseline", func(b *testing.B) { buf := make([]byte, 1024) diff --git a/struct.go b/struct.go index dd722066..fcfce400 100644 --- a/struct.go +++ b/struct.go @@ -321,7 +321,7 @@ func (p *Struct) GetTextSuperUnsafe(i uint16) (string, error) { return *(*string)(unsafe.Pointer(&tb)), nil } -func (p Struct) FlatSetNewText(i uint16, v string) error { +func (p *Struct) FlatSetNewText(i uint16, v string) error { // NewText().newPrimitiveList sz := Size(1) n := int32(len(v) + 1) From 89a28b63d84a7baa9d9e6742a7f0ac4869228480 Mon Sep 17 00:00:00 2001 From: Matheus Degiovani Date: Tue, 18 Jun 2024 16:26:45 -0300 Subject: [PATCH 19/19] more improv --- internal/aircraftlib/experimental.go | 1 - my_test.go | 27 ++++++++++++++++++++++----- 2 files changed, 22 insertions(+), 6 deletions(-) diff --git a/internal/aircraftlib/experimental.go b/internal/aircraftlib/experimental.go index 7190fa43..e768bad1 100644 --- a/internal/aircraftlib/experimental.go +++ b/internal/aircraftlib/experimental.go @@ -11,7 +11,6 @@ import ( func AllocateNewRootBenchmark(msg *capnp.Message) (BenchmarkA, error) { st, err := capnp.AllocateRootStruct(msg, capnp.ObjectSize{DataSize: 24, PointerCount: 2}) return BenchmarkA(st), err - } // Exprimental: set the name using the flat/unrolled version of SetNewText. diff --git a/my_test.go b/my_test.go index 594d3b39..6bda8bfd 100644 --- a/my_test.go +++ b/my_test.go @@ -268,13 +268,21 @@ func BenchmarkGoserBench(b *testing.B) { } type CapNProtoImprovSerializer struct { + arena capnp.Arena + msg *capnp.Message } func (x *CapNProtoImprovSerializer) Marshal(o interface{}) ([]byte, error) { - msg, seg := capnp.NewSingleSegmentMessage(nil) + // x.msg.Release() + x.msg.ResetForRead(x.arena) + seg, err := x.msg.Segment(0) + if err != nil { + return nil, err + } - // c, err := aircraftlib.AllocateNewRootBenchmark(msg) - c, err := aircraftlib.NewRootBenchmarkA(seg) + c, err := aircraftlib.AllocateNewRootBenchmark(x.msg) + _ = seg + // c, err := aircraftlib.NewRootBenchmarkA(seg) if err != nil { return nil, err } @@ -287,7 +295,7 @@ func (x *CapNProtoImprovSerializer) Marshal(o interface{}) ([]byte, error) { c.FlatSetName(a.Name) // c.SetName(a.Name) c.FlatSetPhone(a.Phone) // c.SetPhone(a.Phone) - return msg.Marshal() + return x.msg.Marshal() } func (x *CapNProtoImprovSerializer) Unmarshal(d []byte, i interface{}) error { @@ -319,7 +327,16 @@ func (x *CapNProtoImprovSerializer) Unmarshal(d []byte, i interface{}) error { } func NewCapNProtoImprovSerializer() goserbench.Serializer { - return &CapNProtoSerializer{} + // arena := capnp.SingleSegment(nil) + arena := &capnp.SimpleSingleSegmentArena{} + msg, _, err := capnp.NewMessage(arena) + if err != nil { + panic(err) + } + return &CapNProtoImprovSerializer{ + arena: arena, + msg: msg, + } } func BenchmarkGoserBenchImprov(b *testing.B) {