Skip to content

Commit

Permalink
Impl FindCoordinator endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
thedodd committed May 14, 2024
1 parent 1ac7077 commit fa2ad89
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 26 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "kafka-rs"
description = "Native Rust Kafka client, built upon kafka-protocol-rs and Tokio."
version = "0.5.0-rc.1"
version = "0.5.0-rc.2"
edition = "2021"
license = "MIT/Apache-2.0"
authors = ["Anthony Dodd <dodd.anthonyjosiah@gmail.com>"]
Expand Down
36 changes: 35 additions & 1 deletion src/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{collections::BTreeMap, time::Duration};
use anyhow::{Context, Result};
use bytes::Bytes;
use futures::prelude::*;
use kafka_protocol::messages::{FetchRequest, ListOffsetsRequest};
use kafka_protocol::messages::{FetchRequest, FindCoordinatorRequest, ListOffsetsRequest};
use kafka_protocol::{
messages::{metadata_response::MetadataResponseBroker, ApiKey, ApiVersionsRequest, MetadataRequest, ProduceRequest, RequestHeader, RequestKind, ResponseHeader, ResponseKind},
protocol::{Decodable, Message, Request as RequestProto, VersionRange},
Expand Down Expand Up @@ -40,6 +40,7 @@ pub(crate) enum Msg {
GetMetadata(uuid::Uuid, ResponseChannel, bool),
Produce(uuid::Uuid, ProduceRequest, MsgTx),
Fetch(uuid::Uuid, FetchRequest, ResponseChannel),
FindCoordinator(uuid::Uuid, FindCoordinatorRequest, ResponseChannel),
ListOffsets(uuid::Uuid, ListOffsetsRequest, ResponseChannel),
}

Expand Down Expand Up @@ -88,6 +89,11 @@ impl Broker {
pub(crate) async fn fetch<T: Into<ResponseChannel>>(&self, id: uuid::Uuid, req: FetchRequest, tx: T) {
let _ = self.chan.send(Msg::Fetch(id, req, tx.into())).await; // Unreachable error case.
}

/// Submit a FindCoordinator request to the broker.
pub(crate) async fn find_coordinator<T: Into<ResponseChannel>>(&self, id: uuid::Uuid, req: FindCoordinatorRequest, tx: T) {
let _ = self.chan.send(Msg::FindCoordinator(id, req, tx.into())).await; // Unreachable error case.
}
}

/// All possible states in which a broker connection may exist.
Expand Down Expand Up @@ -157,6 +163,7 @@ impl BrokerTask {
Msg::GetMetadata(id, tx, internal) => self.get_metadata(writer, id, Some(tx), internal).await,
Msg::Produce(id, req, tx) => self.produce(writer, id, req, Some(tx)).await,
Msg::Fetch(id, req, tx) => self.fetch(writer, id, req, Some(tx)).await,
Msg::FindCoordinator(id, req, tx) => self.find_coordinator(writer, id, req, Some(tx)).await,
Msg::ListOffsets(id, req, tx) => self.list_offsets(writer, id, req, Some(tx)).await,
}
}
Expand Down Expand Up @@ -318,6 +325,33 @@ impl BrokerTask {
self.write(writer, correlation_id, req).await
}

async fn find_coordinator(&mut self, writer: &mut KafkaWriter, id: uuid::Uuid, req: FindCoordinatorRequest, chan: Option<ResponseChannel>) -> Result<()> {
let correlation_id = self.next_correlation_id;
self.next_correlation_id = self.next_correlation_id.wrapping_add(1);

let (min, max) = self.api_versions.get(&FindCoordinatorRequest::KEY).copied().unwrap_or((0, 0));
let supported_versions = FindCoordinatorRequest::VERSIONS.intersect(&VersionRange { min, max });
tracing::debug!(?supported_versions, correlation_id, "sending find coordinator request");

let api_key = ApiKey::FindCoordinatorKey;
let mut header = RequestHeader::default();
header.request_api_key = api_key as i16;
header.request_api_version = supported_versions.max;
header.correlation_id = correlation_id;

let req = OutboundRequest {
id,
request: Request {
header,
kind: RequestKind::FindCoordinatorRequest(req),
},
api_version: supported_versions.max,
api_key,
chan,
};
self.write(writer, correlation_id, req).await
}

/// Handle a response from the broker.
async fn handle_response(&mut self, mut resp: Response) {
// Decode header.
Expand Down
89 changes: 67 additions & 22 deletions src/client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Kafka client implementation.

use std::sync::Arc;
use std::{sync::Arc, time::Duration};

use bytes::{Bytes, BytesMut};
use kafka_protocol::{
Expand All @@ -9,15 +9,16 @@ use kafka_protocol::{
fetch_request::{FetchPartition, FetchTopic},
list_offsets_request::{ListOffsetsPartition, ListOffsetsTopic},
produce_request::PartitionProduceData,
FetchRequest, ListOffsetsRequest, MetadataResponse, ProduceRequest, ResponseHeader, ResponseKind,
FetchRequest, FindCoordinatorRequest, FindCoordinatorResponse, ListOffsetsRequest, MetadataResponse, ProduceRequest, ResponseHeader, ResponseKind,
},
protocol::StrBytes,
records::{Compression, Record, RecordBatchDecoder, RecordBatchEncoder, RecordEncodeOptions, TimestampType},
ResponseError,
};
use tokio::sync::{mpsc, oneshot};
use tokio_util::sync::{CancellationToken, DropGuard};

use crate::clitask::{ClientTask, ClusterMeta, MetadataPolicy, Msg};
use crate::clitask::{ClientTask, Cluster, ClusterMeta, MetadataPolicy, Msg};
use crate::error::ClientError;
#[cfg(feature = "internal")]
use crate::internal::InternalClient;
Expand Down Expand Up @@ -95,13 +96,7 @@ impl Client {

/// Get cluster metadata from the broker specified by ID.
pub async fn get_metadata(&self, broker_id: i32) -> ClientResult<MetadataResponse> {
let mut cluster = self.cluster.load();
if !*cluster.bootstrap.borrow() {
let mut sig = cluster.bootstrap.clone();
let _ = sig.wait_for(|val| *val).await; // Ensure the cluster metadata is bootstrapped.
cluster = self.cluster.load();
}

let cluster = self.get_cluster_metadata_cache().await?;
let broker = cluster
.brokers
.get(&broker_id)
Expand All @@ -126,12 +121,7 @@ impl Client {

/// List topic partition offsets.
pub async fn list_offsets(&self, topic: StrBytes, ptn: i32, pos: ListOffsetsPosition) -> ClientResult<i64> {
let mut cluster = self.cluster.load();
if !*cluster.bootstrap.borrow() {
let mut sig = cluster.bootstrap.clone();
let _ = sig.wait_for(|val| *val).await; // Ensure the cluster metadata is bootstrapped.
cluster = self.cluster.load();
}
let cluster = self.get_cluster_metadata_cache().await?;

// Get the broker responsible for the target topic/partition.
let topic_ptns = cluster.topics.get(&topic).ok_or(ClientError::UnknownTopic(topic.to_string()))?;
Expand Down Expand Up @@ -182,12 +172,7 @@ impl Client {

/// Fetch a batch of records from the target topic partition.
pub async fn fetch(&self, topic: StrBytes, ptn: i32, start: i64) -> ClientResult<Option<Vec<Record>>> {
let mut cluster = self.cluster.load();
if !*cluster.bootstrap.borrow() {
let mut sig = cluster.bootstrap.clone();
let _ = sig.wait_for(|val| *val).await; // Ensure the cluster metadata is bootstrapped.
cluster = self.cluster.load();
}
let cluster = self.get_cluster_metadata_cache().await?;

// Get the broker responsible for the target topic/partition.
let topic_ptns = cluster.topics.get(&topic).ok_or(ClientError::UnknownTopic(topic.to_string()))?;
Expand Down Expand Up @@ -241,6 +226,66 @@ impl Client {
Ok(Some(records))
}

/// Get the cached cluster metadata.
///
/// If the cluster metadata has not yet been bootstrapped, then this routine will wait for
/// a maximum of 10s for the metadata to be bootstrapped, and will then timeout.
async fn get_cluster_metadata_cache(&self) -> ClientResult<Arc<Cluster>> {
let mut cluster = self.cluster.load();
if !*cluster.bootstrap.borrow() {
let mut sig = cluster.bootstrap.clone();
let _ = tokio::time::timeout(Duration::from_secs(10), sig.wait_for(|val| *val))
.await
.map_err(|_err| ClientError::ClusterMetadataTimeout)?
.map_err(|_err| ClientError::ClusterMetadataTimeout)?;
cluster = self.cluster.load();
}
Ok(cluster.clone())
}

/// Find the coordinator for the given group.
pub async fn find_coordinator(&self, key: StrBytes, key_type: i8, broker_id: Option<i32>) -> ClientResult<FindCoordinatorResponse> {
let cluster = self.get_cluster_metadata_cache().await?;

// Get the specified broker connection, else get the first available.
let broker = broker_id
.and_then(|id| cluster.brokers.get(&id).cloned())
.or_else(|| {
// TODO: get random broker.
cluster.brokers.first_key_value().map(|(_, broker)| broker.clone())
})
.ok_or_else(|| ClientError::NoBrokerFound)?;

// Build request.
let uid = uuid::Uuid::new_v4();
let mut req = FindCoordinatorRequest::default();
req.key = key;
req.key_type = key_type;

// Send request.
let (tx, rx) = oneshot::channel();
broker.conn.find_coordinator(uid, req, tx).await;

// Unpack response & handle errors.
unpack_broker_response(rx)
.await
.and_then(|(_, res)| {
tracing::debug!("res: {:?}", res);
if let ResponseKind::FindCoordinatorResponse(res) = res {
Ok(res)
} else {
Err(ClientError::MalformedResponse)
}
})
.and_then(|res| {
// Handle broker response codes.
if res.error_code != 0 {
return Err(ClientError::ResponseError(res.error_code, ResponseError::try_from_code(res.error_code), res.error_message));
}
Ok(res)
})
}

/// Build a producer for a topic.
pub fn topic_producer(&self, topic: &str, acks: Acks, timeout_ms: Option<i32>, compression: Option<Compression>) -> TopicProducer {
let (tx, rx) = mpsc::unbounded_channel();
Expand Down
8 changes: 8 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use crate::StrBytes;
pub type ClientResult<T> = std::result::Result<T, ClientError>;

/// Client errors from interacting with a Kafka cluster.
///
/// TODO: probably just refactor this into an opaque Retryable and Fatal errors, which just dump info on debug.
#[derive(Debug, thiserror::Error)]
pub enum ClientError {
/// Error while interacting with a broker.
Expand Down Expand Up @@ -37,6 +39,12 @@ pub enum ClientError {
/// An error was returned in a response from a broker.
#[error("an error was returned in a response from a broker: {0} {1:?} {2:?}")]
ResponseError(i16, Option<ResponseError>, Option<StrBytes>),
/// Timeout while waiting for cluster metadata to bootstrap.
#[error("timeout while waiting for cluster metadata to bootstrap")]
ClusterMetadataTimeout,
/// Could not find a broker specified by ID, or any broker at all.
#[error("could not find a broker specified by ID, or any broker at all")]
NoBrokerFound,
#[error("{0}")]
Other(String),
}
Expand Down
2 changes: 1 addition & 1 deletion src/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::{Client, StrBytes};
#[derive(Clone)]
pub struct InternalClient {
/// A handle to the underlying client.
client: Client,
pub client: Client,
}

impl InternalClient {
Expand Down

0 comments on commit fa2ad89

Please sign in to comment.