Skip to content

Commit

Permalink
Simplify implementation
Browse files Browse the repository at this point in the history
Replace internal fiber collection with an internal counter. This
is mainly to reduce memory footprint for long running processes.
  • Loading branch information
akadusei committed Apr 15, 2024
1 parent 68dd2f4 commit 8c2301e
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 120 deletions.
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,20 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## [Unreleased] -

### Changed
- Replace internal fiber collection with an internal counter

### Fixed
- Reduce memory footprint for long running processes

### Removed
- Remove `Pond#<<` method
- Remove `Pond.drain` methods
- Remove `Pond.fill(Fiber)` overload
- Remove `Pond.fill(Array(Fiber))` overload

## [1.0.1] - 2024-02-14

### Fixed
Expand Down
44 changes: 5 additions & 39 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Pond

**Pond** is a *Crystal* implementation of a *WaitGroup*, without channels or counters. *Pond* automatically keeps track of all fibers it is made aware of, and waits until all of them complete execution.
**Pond** is a *Crystal* implementation of a *WaitGroup*, without channels or explicit counters. *Pond* automatically keeps track of all its fibers, and waits until all of them complete execution.

## Installation

Expand All @@ -16,21 +16,7 @@

## Usage

- Add fibers and wait on them:

```crystal
require "pond"
pond = Pond.new
1000.times do |_|
pond << spawn { do_work }
end
pond.drain # <= Waits for fibers to complete
```

- Let *Pond* spawn fibers and wait on them:
- Spawn fibers and wait on them:

```crystal
require "pond"
Expand All @@ -41,10 +27,10 @@
pond.fill { do_work } # <= Spawns fiber and passes block to it
end
pond.drain
pond.drain # <= Waits for fibers to complete
```

- You may add *nested* fibers:
- You may spawn *nested* fibers:

In this case, all *ancestor* fibers have to be added to the pond, otherwise *Pond* can't guarantee any of them would complete.

Expand All @@ -62,7 +48,7 @@
pond.drain
```

Note that, while you can add fibers to a pond that was created in a another fiber, draining has to be done in the same fiber the pond was created in. This is to prevent potential deadlocks.
Note that, while you can fill a pond that was created in a another fiber, draining has to be done in the same fiber the pond was created in. This is to prevent potential deadlocks.

```crystal
require "pond"
Expand All @@ -74,26 +60,6 @@
spawn { pond.drain } # <= Error!
````
- Wait on a single existing fiber:
```crystal
require "pond"

fiber = spawn { do_work }

Pond.drain(fiber) # <= Waits until fiber completes
```
- Wait on multiple existing fibers:
```crystal
require "pond"

fibers = Array(Fiber).new(5, spawn { do_work })

Pond.drain(fibers) # <= Waits until all fibers complete
```
## Development
Run tests with `crystal spec -Dpreview_mt`. You may set `CRYSTAL_WORKERS` environment variable with `export CRYSTAL_WORKERS=<number>`, before running tests.
Expand Down
14 changes: 2 additions & 12 deletions spec/pond_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ describe Pond do

pond.fill do
pond.fill do
pond << spawn { count.add(1) }
pond.fill { count.add(1) }
end
end

Expand Down Expand Up @@ -60,7 +60,7 @@ describe Pond do

pond.fill do
pond.fill do
pond << spawn { }
pond.fill { }
end
end

Expand All @@ -77,14 +77,4 @@ describe Pond do
pond.size.should eq(0)
end
end

describe ".drain" do
it "waits for fiber to complete" do
count = Atomic(Int32).new(0)
fiber = spawn { count.add(1) }

Pond.drain(fiber)
count.lazy_get.should eq(1)
end
end
end
76 changes: 7 additions & 69 deletions src/pond.cr
Original file line number Diff line number Diff line change
Expand Up @@ -3,96 +3,34 @@ require "./pond/**"

class Pond
def initialize
@fibers = Array(Fiber).new
@counter = Atomic(Int32).new(0)
@fiber = Fiber.current
@mutex = Mutex.new
end

def self.new(fibers) : self
new.fill(fibers)
end

def fill(fibers)
fibers.each { |fiber| fill(fiber) }
self
end

def <<(fiber)
fill(fiber)
end

def fill(name = nil, same_thread = nil, &block)
fill spawn(name: name, same_thread: same_thread, &block)
end
@counter.add(1)

def fill(fiber : Fiber)
unless fiber.dead?
sync do
@fibers << fiber
@done = false
remove_dead_fibers
end
spawn(name: name, same_thread: same_thread) do
block.call
ensure
@counter.sub(1)
end

self
end

def drain : Nil
ensure_same_fiber
return unless @done == false

until size == 0
sleep 1.microsecond
end

sync do
@fibers.clear
@done = nil unless @done
end

until @done
sleep 1.microsecond
end
end

def size
sync { @fibers.count(&.dead?.!) }
end

def self.drain(fiber : Fiber)
drain({fiber})
end

def self.drain(fibers : Indexable(Fiber))
new(fibers).drain
end

private def remove_dead_fibers
return if @removing_dead

spawn do
until @done.nil?
sync { @fibers.reject!(&.dead?) }
sleep 1.microsecond
end

sync do
@removing_dead = false
@done = true
end
end

@removing_dead = true
@counter.get
end

private def ensure_same_fiber
return if @fiber == Fiber.current

sync { @done = nil }
raise Error.new("Cannot drain pond from another fiber")
end

private def sync
@mutex.synchronize { yield }
end
end

0 comments on commit 8c2301e

Please sign in to comment.