Skip to content

Commit

Permalink
fix: make corrections in buffered queue post
Browse files Browse the repository at this point in the history
  • Loading branch information
dhruv-ahuja committed Sep 6, 2023
1 parent c701027 commit 5cd6d25
Showing 1 changed file with 8 additions and 8 deletions.
16 changes: 8 additions & 8 deletions content/posts/implementing_buffered_queue_in_rust.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,17 @@ date = "2023-09-06"

## Introduction

O’Reilly’s `Programming Rust` book walks us through optimizing a part of a pipeline, in Chapter 19 `Concurrency`. It explains how a channel-based pipeline can encounter slowdowns and high memory usage if one of the consumer threads is much slower than one of the producer threads. The producer keeps adding tasks to the queue, but the consumer is unable to consume them at a satisfactory pace. The queue will have a large amount of unconsumed data causing high-memory usage. Defining fixed capacities will lower memory consumption in applications without affecting the latencies since the consumer already processes at its own fixed pace.
O’Reilly’s `Programming Rust` book walks us through optimizing a part of a pipeline, in Chapter 19 `Concurrency`. It explains how a channel-based pipeline can encounter slowdowns and high memory usage if one of the consumer threads is much slower than one of the producer threads. The producer keeps adding tasks to the queue, but the consumer is unable to consume them at a satisfactory pace. The queue will have a large amount of unconsumed data causing memory spikes. Defining fixed capacities will lower memory consumption in applications without affecting the latencies since the consumer already consumes at its own fixed pace.

I had known about queues but had never thought about them in a larger scope, so I thought attempting a custom implementation would be a good way to learn more. I received a lot of help from the Rust community for this project, allowing me to better understand the concepts and improve my code :)  

## Overview

We will walk through the implementation of a simple multi-threaded, blocking, buffered queue. The Producer thread will push elements till the queue is at capacity, and block until the queue has space again. Similarly, the Consumer thread will consume elements till the queue is empty, and block until it has elements again. We do not persist the threads once the input stream is extinguished.
We will walk through the implementation of a simple multi-threaded, blocking, buffered queue. The Producer thread will push elements till the queue is at capacity, and block until the queue has space again. Similarly, the Consumer thread will consume elements till the queue is empty, and block until it has elements again. We do not persist the threads once the input stream is expended.

## Declaring our Types

We can create a new project with `cargo new buffered-queue-rs` and put our queue logic in `src/lib.rs`, marking all code inside the file as library code and making it accessible to the whole project by importing it with the project name specified in the `cargo new` command.
We can create a new project with `cargo new buffered-queue-rs` and put our queue logic in `src/lib.rs`, marking all code inside the file as library code. This makes it accessible to the whole project by importing it with the project name specified in the `cargo new` command.

Add the following imports to the file:

Expand Down Expand Up @@ -63,7 +63,7 @@ enum Operation<'a> {

Acquiring the lock on a mutex returns a [MutexGuard](https://doc.rust-lang.org/std/sync/struct.MutexGuard.html "https://doc.rust-lang.org/std/sync/struct.MutexGuard.html"), a thin wrapper around the value held by the mutex. The [lifetime specifier](https://doc.rust-lang.org/book/ch10-03-lifetime-syntax.html "https://doc.rust-lang.org/book/ch10-03-lifetime-syntax.html") `<’a>`  in the type definition indicates how long the boolean flags are going to stay in memory. They are now associated with the enum variants and their held locks will be unlocked when the enum variants go out of scope.

We can see Rust’s powerful enums here, as we can add data on individual variants like we would do with a struct.
We can see Rust’s powerful enums here, as we can add data on individual variants like we would with a struct.

## Defining Producer and Consumer Logic

Expand Down Expand Up @@ -117,7 +117,7 @@ impl<T> Drop for Producer<T> {

We set `elements_processed` value to `true`, indicating that the producer has processed all its elements and is going out of scope. The `Drop` trait ensures that this implementation detail remains associated with the producer.

The `store` method requires a memory ordering, which defines how the memory is organized and ensures that our code avoids race conditions and improper data access across threads. We use the strongest possible ordering, `SeqCst`.
The `store` method requires a memory ordering, which defines how application memory is organized and ensures that our code avoids race conditions and improper data access across threads. We use the strongest possible ordering, `SeqCst`.

### Consumer

Expand Down Expand Up @@ -186,7 +186,7 @@ pub fn buffered_queue<T>(mut capacity: usize) -> (Producer<T>, Consumer<T>) {
}
```

`buffered_queue` takes a capacity and returns a tuple of Producer and Consumer types. It uses 1 as default if the capacity is 0, wraps the buffered queue value in Arc for cheap referencing and thread-safety, makes a reference copy and passes the Arc instances to Producer and Consumer.
`buffered_queue` takes a capacity and returns a tuple of Producer and Consumer types. It uses 1 as default if the capacity is 0, wraps the buffered queue value in Arc for cheap referencing and thread-safety, makes a reference copy and passes the Arc instances to Producer and Consumer types.

Now we will implement its methods:

Expand Down Expand Up @@ -238,7 +238,7 @@ impl<T> BufferedQueue<T> {

This method accepts the internal queue and operation enum types. `queue` defines the double-ended queue value after acquiring its mutex lock.

We match the operation value and define the associated boolean values as mutable. Rust allows us to shorthand values if the variable name matches the field name, so we can write `{ mut is_full_flag: is_full_flag }` as  `{ mut is_full_flag }` and so on.
We match the operation variants and define their associated boolean values as mutable. Rust allows us to shorthand values if the variable name matches the field name, so we can write `{ mut is_full_flag: is_full_flag }` as  `{ mut is_full_flag }` and so on.

The method checks whether the queue’s state has changed: after an element `Push`, whether the queue is now full and whether it was empty earlier, after an element `Pop`, whether the queue is now empty and whether it was full before. It notifies waiting threads on the state changes if these conditions match, by calling the Condvars’ `notify_all` method.

Expand Down Expand Up @@ -340,4 +340,4 @@ exhausted queue, terminating consumer!

This was a rewarding exercise for me, as it helped me get more familiar with Rust and concurrency concepts in general. You can find the full code for the exercise [here](https://github.com/dhruv-ahuja/buffered-queue-rs "https://github.com/dhruv-ahuja/buffered-queue-rs"), there are some differences in the code shown here and in the repo.

Thanks for reading my post, any feedback or advice would be appreciated! You can write to me at [dhruvahuja2k@gmail.com](mailto:dhruvahuja2k@gmail.com "mailto:dhruvahuja2k@gmail.com").
Thanks for reading my post, any feedback or advice would be appreciated! You can write to me at [my email](mailto:dhruvahuja2k@gmail.com "mailto:dhruvahuja2k@gmail.com").

0 comments on commit 5cd6d25

Please sign in to comment.