Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
thedevop committed Mar 26, 2024
1 parent 546e2fc commit 4cce60a
Show file tree
Hide file tree
Showing 7 changed files with 475 additions and 0 deletions.
11 changes: 11 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
module github.com/thedevop/lockfree

go 1.21.0

require github.com/stretchr/testify v1.9.0

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
10 changes: 10 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
8 changes: 8 additions & 0 deletions list/node.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package list

import "sync/atomic"

type node[T any] struct {
value T
next atomic.Pointer[node[T]]
}
66 changes: 66 additions & 0 deletions list/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package list

import (
"sync/atomic"
)

// Queue is a lockfree implementation of queue (FIFO).
type Queue[T any] struct {
head, tail atomic.Pointer[node[T]]
count atomic.Int64
}

// NewQueue returns a Queue (FIFO).
func NewQueue[T any]() *Queue[T] {
queue := &Queue[T]{}
placeholder := &node[T]{}
queue.head.Store(placeholder)
queue.tail.Store(placeholder)
return queue
}

// Enqueue an element at the end of the queue.
func (q *Queue[T]) Enqueue(v T) {
node := &node[T]{value: v}
for {
tail := q.tail.Load()

if q.tail.CompareAndSwap(tail, node) {
tail.next.Store(node)
q.count.Add(1)
return
}
}
}

// Dequeue removes and returns the head element of the queue. If queue is empty, ok returns false.
func (q *Queue[T]) Dequeue() (v T, ok bool) {
for {
head := q.head.Load()
next := head.next.Load()
if next == nil {
return
}

if q.head.CompareAndSwap(head, next) {
q.count.Add(-1)
return next.value, true
}
}
}

// Peek the head element in the queue without removing. If queue is empty, ok returns false.
func (q *Queue[T]) Peek() (v T, ok bool) {
head := q.head.Load()
next := head.next.Load()
if next == nil {
return
}

return next.value, true
}

// Len returns number of elements in the queue.
func (q *Queue[T]) Len() int {
return int(q.count.Load())
}
126 changes: 126 additions & 0 deletions list/queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package list

import (
"context"
"sync"
"testing"

"github.com/stretchr/testify/require"
)

func TestNewQueue(t *testing.T) {
q := NewQueue[int]()
require.NotNil(t, q)
require.IsType(t, &Queue[int]{}, q)
}

func TestQueueEnqueue(t *testing.T) {
v := "test"
q := NewQueue[string]()
q.Enqueue(v)
require.Equal(t, v, q.head.Load().next.Load().value)
}

func TestQueueDequeue(t *testing.T) {
v := "test"
q := NewQueue[string]()
q.Enqueue(v)
r, ok := q.Dequeue()
require.True(t, ok)
require.Equal(t, v, r)
}

func TestQueue(t *testing.T) {
const Threads = 100
const Count = 10000

results := testQueue(Threads, Threads, Count, false)

combinedResults := make(map[int]bool)
for i := 0; i < Threads; i++ {
for k, v := range results[i] {
combinedResults[k] = v
}
}

require.Equal(t, Threads*Count, len(combinedResults))

for i := 0; i < Threads; i++ {
start := i * Count
end := start + Count
for n := start; n < end; n++ {
_, ok := combinedResults[n]
require.True(t, ok)
}
}
}

func testQueue(wthreads, rthreads, count int, sequential bool) []map[int]bool {
q := NewQueue[int]()

ctx, cancel := context.WithCancel(context.Background())

results := make([]map[int]bool, rthreads)
enq := sync.WaitGroup{}
enq.Add(rthreads)
for i := 0; i < rthreads; i++ {
result := make(map[int]bool)
results[i] = result
go func(i int) {
defer enq.Done()
if sequential {
<-ctx.Done()
}
for {
v, ok := q.Dequeue()
if ok {
result[v] = true
}
if ctx.Err() != nil && q.Len() == 0 {
break
}
}
}(i)
}

deq := sync.WaitGroup{}
deq.Add(wthreads)
for i := 0; i < wthreads; i++ {
go func(i int) {
defer deq.Done()
start := i * count
end := start + count
for n := start; n < end; n++ {
q.Enqueue(n)
}
}(i)
}

deq.Wait()
cancel()
enq.Wait()

return results
}

func BenchmarkQueue(b *testing.B) {
const Threads = 100
const Count = 10000

b.ReportAllocs()

for i := 0; i < b.N; i++ {
testQueue(Threads, Threads, Count, false)
}
}

func BenchmarkQueueSeq(b *testing.B) {
const Threads = 100
const Count = 10000

b.ReportAllocs()

for i := 0; i < b.N; i++ {
testQueue(Threads, Threads, Count, true)
}
}
58 changes: 58 additions & 0 deletions list/stack.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package list

import "sync/atomic"

// Stack is a lockfree implementation of stack (LIFO).
type Stack[T any] struct {
head atomic.Pointer[node[T]]
count atomic.Int64
}

// NewStack returns a Stack (LIFO).
func NewStack[T any]() *Stack[T] {
return &Stack[T]{}
}

// Push an element to the head.
func (s *Stack[T]) Push(v T) {
node := &node[T]{value: v}
for {
head := s.head.Load()
node.next.Store(head)

if s.head.CompareAndSwap(head, node) {
s.count.Add(1)
return
}
}
}

// Pop an element from the head. If stack is empty, ok returns false.
func (s *Stack[T]) Pop() (v T, ok bool) {
for {
head := s.head.Load()
if head == nil {
return
}

if s.head.CompareAndSwap(head, head.next.Load()) {
s.count.Add(-1)
return head.value, true
}
}
}

// Peek the head element in the stack. If stack is empty, ok returns false.
func (s *Stack[T]) Peek() (v T, ok bool) {
head := s.head.Load()
if head == nil {
return
}

return head.value, true
}

// Len returns number of elements in the stack
func (s *Stack[T]) Len() int {
return int(s.count.Load())
}
Loading

0 comments on commit 4cce60a

Please sign in to comment.