Skip to content

Commit

Permalink
KAFKA-15363: Broker log directory failure changes (apache#14790)
Browse files Browse the repository at this point in the history
Part of JBOD KIP-858, https://cwiki.apache.org/confluence/display/KAFKA/KIP-858%3A+Handle+JBOD+broker+disk+failure+in+KRaft

Reviewers: Igor Soarez <i@soarez.me>, Colin P. McCabe <cmccabe@apache.org>, Ron Dagostino <rdagostino@confluent.io>
  • Loading branch information
OmniaGM authored Dec 8, 2023
1 parent 5ba7bfa commit ec92410
Show file tree
Hide file tree
Showing 9 changed files with 322 additions and 31 deletions.
18 changes: 13 additions & 5 deletions core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,16 @@ class Partition(val topicPartition: TopicPartition,
// Only ReplicaAlterDirThread will call this method and ReplicaAlterDirThread should remove the partition
// from its partitionStates if this method returns true
def maybeReplaceCurrentWithFutureReplica(): Boolean = {
// lock to prevent the log append by followers while checking if the log dir could be replaced with future log.
runCallbackIfFutureReplicaCaughtUp((futurePartitionLog: UnifiedLog) => {
logManager.replaceCurrentWithFutureLog(topicPartition)
futurePartitionLog.setLogOffsetsListener(logOffsetsListener)
log = futureLog
removeFutureLocalReplica(false)
})
}

def runCallbackIfFutureReplicaCaughtUp(callback: UnifiedLog => Unit): Boolean = {
// lock to prevent the log append by followers while checking if future log caught-up.
futureLogLock.synchronized {
val localReplicaLEO = localLogOrException.logEndOffset
val futureReplicaLEO = futureLog.map(_.logEndOffset)
Expand All @@ -624,10 +633,7 @@ class Partition(val topicPartition: TopicPartition,
futureLog match {
case Some(futurePartitionLog) =>
if (log.exists(_.logEndOffset == futurePartitionLog.logEndOffset)) {
logManager.replaceCurrentWithFutureLog(topicPartition)
futurePartitionLog.setLogOffsetsListener(logOffsetsListener)
log = futureLog
removeFutureLocalReplica(false)
callback(futurePartitionLog)
true
} else false
case None =>
Expand All @@ -642,6 +648,8 @@ class Partition(val topicPartition: TopicPartition,
}
}

def futureReplicaDirectoryId(): Option[Uuid] = futureLog.flatMap(log => logManager.directoryId(log.dir.getParent))
def logDirectoryId(): Option[Uuid] = log.flatMap(log => logManager.directoryId(log.dir.getParent))
/**
* Delete the partition. Note that deleting the partition does not delete the underlying logs.
* The logs are deleted by the ReplicaManager after having deleted the partition.
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -298,8 +298,9 @@ class BrokerServer(
() => lifecycleManager.brokerEpoch
)
val directoryEventHandler = new DirectoryEventHandler {
override def handleAssignment(partition: TopicIdPartition, directoryId: Uuid): Unit =
assignmentsManager.onAssignment(partition, directoryId)
override def handleAssignment(partition: TopicIdPartition, directoryId: Uuid, callback: Runnable): Unit =
assignmentsManager.onAssignment(partition, directoryId, callback)

override def handleFailure(directoryId: Uuid): Unit =
lifecycleManager.propagateDirectoryFailure(directoryId)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@ package kafka.server

import kafka.cluster.BrokerEndPoint
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.server.common.DirectoryEventHandler

class ReplicaAlterLogDirsManager(brokerConfig: KafkaConfig,
replicaManager: ReplicaManager,
quotaManager: ReplicationQuotaManager,
brokerTopicStats: BrokerTopicStats)
brokerTopicStats: BrokerTopicStats,
directoryEventHandler: DirectoryEventHandler = DirectoryEventHandler.NOOP
)
extends AbstractFetcherManager[ReplicaAlterLogDirsThread](
name = s"ReplicaAlterLogDirsManager on broker ${brokerConfig.brokerId}",
clientId = "ReplicaAlterLogDirs",
Expand All @@ -33,7 +36,7 @@ class ReplicaAlterLogDirsManager(brokerConfig: KafkaConfig,
val threadName = s"ReplicaAlterLogDirsThread-$fetcherId"
val leader = new LocalLeaderEndPoint(sourceBroker, brokerConfig, replicaManager, quotaManager)
new ReplicaAlterLogDirsThread(threadName, leader, failedPartitions, replicaManager,
quotaManager, brokerTopicStats, brokerConfig.replicaFetchBackoffMs)
quotaManager, brokerTopicStats, brokerConfig.replicaFetchBackoffMs, directoryEventHandler)
}

override protected def addPartitionsToFetcherThread(fetcherThread: ReplicaAlterLogDirsThread,
Expand Down
79 changes: 75 additions & 4 deletions core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@

package kafka.server

import kafka.cluster.Partition
import kafka.server.ReplicaAlterLogDirsThread.{DirectoryEventRequestState, QUEUED}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.requests.FetchResponse
import org.apache.kafka.server.common.OffsetAndEpoch
import org.apache.kafka.server.common.{DirectoryEventHandler, OffsetAndEpoch, TopicIdPartition}
import org.apache.kafka.storage.internals.log.{LogAppendInfo, LogStartOffsetIncrementReason}

import java.util.concurrent.ConcurrentHashMap
import scala.collection.{Map, Set}

class ReplicaAlterLogDirsThread(name: String,
Expand All @@ -30,7 +33,9 @@ class ReplicaAlterLogDirsThread(name: String,
replicaMgr: ReplicaManager,
quota: ReplicationQuotaManager,
brokerTopicStats: BrokerTopicStats,
fetchBackOffMs: Int)
fetchBackOffMs: Int,
directoryEventHandler: DirectoryEventHandler = DirectoryEventHandler.NOOP,
)
extends AbstractFetcherThread(name = name,
clientId = name,
leader = leader,
Expand All @@ -40,6 +45,8 @@ class ReplicaAlterLogDirsThread(name: String,
isInterruptible = false,
brokerTopicStats) {

private val assignmentRequestStates: ConcurrentHashMap[TopicPartition, DirectoryEventRequestState] = new ConcurrentHashMap()

override protected def latestEpoch(topicPartition: TopicPartition): Option[Int] = {
replicaMgr.futureLocalLogOrException(topicPartition).latestEpoch
}
Expand Down Expand Up @@ -76,13 +83,70 @@ class ReplicaAlterLogDirsThread(name: String,
futureLog.updateHighWatermark(partitionData.highWatermark)
futureLog.maybeIncrementLogStartOffset(partitionData.logStartOffset, LogStartOffsetIncrementReason.LeaderOffsetIncremented)

if (partition.maybeReplaceCurrentWithFutureReplica())
removePartitions(Set(topicPartition))
directoryEventHandler match {
case DirectoryEventHandler.NOOP =>
if (partition.maybeReplaceCurrentWithFutureReplica())
removePartitions(Set(topicPartition))
case _ =>
maybePromoteFutureReplica(topicPartition, partition)
}

quota.record(records.sizeInBytes)
logAppendInfo
}

override def removePartitions(topicPartitions: Set[TopicPartition]): Map[TopicPartition, PartitionFetchState] = {
// Schedule assignment request to revert any queued request before cancelling
for {
topicPartition <- topicPartitions
partitionState <- partitionAssignmentRequestState(topicPartition)
if partitionState == QUEUED
partition = replicaMgr.getPartitionOrException(topicPartition)
topicId <- partition.topicId
directoryId <- partition.logDirectoryId()
topicIdPartition = new TopicIdPartition(topicId, topicPartition.partition())
} directoryEventHandler.handleAssignment(topicIdPartition, directoryId, () => ())

super.removePartitions(topicPartitions)
}

// Visible for testing
private[server] def updatedAssignmentRequestState(topicPartition: TopicPartition)(state: ReplicaAlterLogDirsThread.DirectoryEventRequestState): Unit = {
assignmentRequestStates.put(topicPartition, state)
}

private def maybePromoteFutureReplica(topicPartition: TopicPartition, partition: Partition) = {
val topicId = partition.topicId
if (topicId.isEmpty)
throw new IllegalStateException(s"Topic ${topicPartition.topic()} does not have an ID.")

partitionAssignmentRequestState(topicPartition) match {
case None =>
// Schedule assignment request and don't promote the future replica yet until the controller has accepted the request.
partition.runCallbackIfFutureReplicaCaughtUp(_ => {
partition.futureReplicaDirectoryId()
.map(id => {
directoryEventHandler.handleAssignment(new TopicIdPartition(topicId.get, topicPartition.partition()), id,
() => updatedAssignmentRequestState(topicPartition)(ReplicaAlterLogDirsThread.COMPLETED))
// mark the assignment request state as queued.
updatedAssignmentRequestState(topicPartition)(ReplicaAlterLogDirsThread.QUEUED)
})
})
case Some(ReplicaAlterLogDirsThread.COMPLETED) =>
// Promote future replica if controller accepted the request and the replica caught-up with the original log.
if (partition.maybeReplaceCurrentWithFutureReplica()) {
removePartitions(Set(topicPartition))
assignmentRequestStates.remove(topicPartition)
}
case _ =>
log.trace("Waiting for AssignmentRequest to succeed before promoting the future replica.")
}
}

private def partitionAssignmentRequestState(topicPartition: TopicPartition): Option[DirectoryEventRequestState] = {
Option(assignmentRequestStates.get(topicPartition))
}

override def addPartitions(initialFetchStates: Map[TopicPartition, InitialFetchState]): Set[TopicPartition] = {
partitionMapLock.lockInterruptibly()
try {
Expand Down Expand Up @@ -123,3 +187,10 @@ class ReplicaAlterLogDirsThread(name: String,
partition.truncateFullyAndStartAt(offset, isFuture = true)
}
}
object ReplicaAlterLogDirsThread {
sealed trait DirectoryEventRequestState

case object QUEUED extends DirectoryEventRequestState

case object COMPLETED extends DirectoryEventRequestState
}
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2370,7 +2370,7 @@ class ReplicaManager(val config: KafkaConfig,
throw new IllegalStateException(s"Assignment for topic without ID: ${tp.topic()}")
case Some(topicId) =>
val topicIdPartition = new common.TopicIdPartition(topicId, tp.partition())
directoryEventHandler.handleAssignment(topicIdPartition, dirId)
directoryEventHandler.handleAssignment(topicIdPartition, dirId, () => ())
}
}
}
Expand Down Expand Up @@ -2419,7 +2419,7 @@ class ReplicaManager(val config: KafkaConfig,
}

protected def createReplicaAlterLogDirsManager(quotaManager: ReplicationQuotaManager, brokerTopicStats: BrokerTopicStats) = {
new ReplicaAlterLogDirsManager(config, this, quotaManager, brokerTopicStats)
new ReplicaAlterLogDirsManager(config, this, quotaManager, brokerTopicStats, directoryEventHandler)
}

protected def createReplicaSelector(): Option[ReplicaSelector] = {
Expand Down
Loading

0 comments on commit ec92410

Please sign in to comment.