Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix OOM caused by queue pressure #377

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
Expand All @@ -59,7 +60,7 @@ public class DebeziumChangeEventCapture {
private ClickHouseBatchRunnable runnable;

// Records grouped by Topic Name
private ConcurrentHashMap<String, ConcurrentLinkedQueue<ClickHouseStruct>> records;
private final ConcurrentHashMap<String, Queue<ClickHouseStruct>> records = new ConcurrentHashMap<>();


private BaseDbWriter writer = null;
Expand Down Expand Up @@ -198,14 +199,8 @@ private void processEveryChangeRecord(Properties props, ChangeEvent<SourceRecord
} catch(Exception e) {
log.error("Error retrieving status metrics");
}
ConcurrentLinkedQueue<ClickHouseStruct> queue = new ConcurrentLinkedQueue<ClickHouseStruct>();
if (chStruct != null) {
queue.add(chStruct);
}
synchronized (this.records) {
if (chStruct != null) {
addRecordsToSharedBuffer(chStruct.getTopic(), chStruct);
}
addRecordsToSharedBuffer(chStruct.getTopic(), chStruct);
}
}

Expand Down Expand Up @@ -562,17 +557,22 @@ private void setupProcessingThread(ClickHouseSinkConnectorConfig config, DDLPars
* @param topicName
* @param chs
*/
private void addRecordsToSharedBuffer(String topicName, ClickHouseStruct chs) {
ConcurrentLinkedQueue<ClickHouseStruct> structs;
private void addRecordsToSharedBuffer(String topicName, ClickHouseStruct chs) throws InterruptedException {
Queue<ClickHouseStruct> structs;

if (this.records.containsKey(topicName)) {
structs = this.records.get(topicName);
} else {
structs = new ConcurrentLinkedQueue<>();
structs = new LinkedBlockingQueue<>(200_000);
this.records.putIfAbsent(topicName, structs);
}
structs.add(chs);
synchronized (this.records) {
this.records.put(topicName, structs);
while (true) {
try {
structs.add(chs);
break;
} catch (IllegalStateException e) {
Thread.sleep(50);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
Expand All @@ -39,7 +40,7 @@ public ClickHouseSinkTask() {
private ClickHouseBatchExecutor executor;

// Records grouped by Topic Name
private ConcurrentHashMap<String, ConcurrentLinkedQueue<ClickHouseStruct>> records;
private ConcurrentHashMap<String, Queue<ClickHouseStruct>> records;

private DeDuplicator deduplicator;

Expand Down Expand Up @@ -115,16 +116,25 @@ public void put(Collection<SinkRecord> records) {
}

private void appendToRecords(String topicName, ClickHouseStruct chs) {
ConcurrentLinkedQueue<ClickHouseStruct> structs;
Queue<ClickHouseStruct> structs;

if(this.records.containsKey(topicName)) {
if (this.records.containsKey(topicName)) {
structs = this.records.get(topicName);
} else {
structs = new ConcurrentLinkedQueue<>();
structs = new LinkedBlockingQueue<>(200_000);
this.records.putIfAbsent(topicName, structs);
}
structs.add(chs);
synchronized (this.records) {
this.records.put(topicName, structs);
while (true) {
try {
structs.add(chs);
break;
} catch (IllegalStateException e) {
try {
Thread.sleep(50);
} catch (InterruptedException ignored) {

}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,7 @@
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Types;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
Expand Down Expand Up @@ -237,8 +232,8 @@ private void updatePartitionOffsetMap(Map<TopicPartition, Long> offsetToPartitio
* @param records
* @return
*/
public Map<TopicPartition, Long> groupQueryWithRecords(ConcurrentLinkedQueue<ClickHouseStruct> records,
Map<MutablePair<String, Map<String, Integer>>,
public Map<TopicPartition, Long> groupQueryWithRecords(Collection<ClickHouseStruct> records,
Map<MutablePair<String, Map<String, Integer>>,
List<ClickHouseStruct>> queryToRecordsMap) {


Expand Down Expand Up @@ -468,7 +463,9 @@ public BlockMetaData addToPreparedStatementBatch(String topicName, Map<MutablePa
success = true;

long taskId = this.config.getLong(ClickHouseSinkConnectorConfigVariables.TASK_ID.toString());
log.info("*************** EXECUTED BATCH Successfully " + "Records: " + recordsList.size() + "************** task(" + taskId + ")" + " Thread ID: " + Thread.currentThread().getName());
log.info("*************** EXECUTED BATCH Successfully "
+ "Topic: " + topicName + " "
+ "Records: " + recordsList.size() + " ************** task(" + taskId + ")" + " Thread ID: " + Thread.currentThread().getName());

// ToDo: Clear is not an atomic operation.
// It might delete the records that are inserted by the ingestion process.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,8 @@
import org.slf4j.LoggerFactory;

import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
* Runnable object that will be called on
Expand All @@ -28,7 +25,7 @@
*/
public class ClickHouseBatchRunnable implements Runnable {
private static final Logger log = LoggerFactory.getLogger(ClickHouseBatchRunnable.class);
private final ConcurrentHashMap<String, ConcurrentLinkedQueue<ClickHouseStruct>> records;
private final ConcurrentHashMap<String, Queue<ClickHouseStruct>> records;

private final ClickHouseSinkConnectorConfig config;

Expand All @@ -44,11 +41,9 @@ public class ClickHouseBatchRunnable implements Runnable {


// Map of topic name to buffered records.
Map<String, Map<MutablePair<String, Map<String, Integer>>, List<ClickHouseStruct>>> topicToRecordsMap;

private DBCredentials dbCredentials;

public ClickHouseBatchRunnable(ConcurrentHashMap<String, ConcurrentLinkedQueue<ClickHouseStruct>> records,
public ClickHouseBatchRunnable(ConcurrentHashMap<String, Queue<ClickHouseStruct>> records,
ClickHouseSinkConnectorConfig config,
Map<String, String> topic2TableMap) {
this.records = records;
Expand All @@ -61,7 +56,6 @@ public ClickHouseBatchRunnable(ConcurrentHashMap<String, ConcurrentLinkedQueue<C

//this.queryToRecordsMap = new HashMap<>();
this.topicToDbWriterMap = new HashMap<>();
this.topicToRecordsMap = new HashMap<>();

this.dbCredentials = parseDBConfiguration();

Expand Down Expand Up @@ -96,16 +90,46 @@ public void run() {
}

// Topic Name -> List of records
for (Map.Entry<String, ConcurrentLinkedQueue<ClickHouseStruct>> entry : this.records.entrySet()) {
if (entry.getValue().size() > 0) {
processRecordsByTopic(entry.getKey(), entry.getValue());
for (Map.Entry<String, Queue<ClickHouseStruct>> entry : this.records.entrySet()) {
Queue<ClickHouseStruct> queue = entry.getValue();
while (!queue.isEmpty()) {
Queue<ClickHouseStruct> buffer = this.moveRecordsToSeparateBuffer(entry.getValue());
processRecordsByTopic(entry.getKey(), buffer);
}
}
} catch(Exception e) {
log.error(String.format("ClickHouseBatchRunnable exception - Task(%s)", taskId), e);
}
}

private Queue<ClickHouseStruct> moveRecordsToSeparateBuffer(Queue<ClickHouseStruct> from) throws InterruptedException {
long timeMillis = System.currentTimeMillis();
Iterator<ClickHouseStruct> iterator = from.iterator();
int bufferSize = 100_000;
ArrayDeque<ClickHouseStruct> buffer = new ArrayDeque<>(bufferSize);
while (System.currentTimeMillis() - timeMillis < 5000) {
if (!iterator.hasNext()) {
break;
}
int counter = 0;
while (iterator.hasNext() && buffer.size() < bufferSize) {
buffer.add(iterator.next());
iterator.remove();
++counter;
}
if (buffer.size() == bufferSize) {
break;
}
if (counter < 1000) { //probably fetching data from binlog by now (or small or really wide table)
break;
}
Thread.sleep(50);
}
log.info(String.format("Built new batch for processing in %d msec", System.currentTimeMillis() - timeMillis));

return buffer;
}

/**
* Function to retrieve table name from topic name
*
Expand Down Expand Up @@ -141,7 +165,7 @@ public DbWriter getDbWriterForTable(String topicName, String tableName, ClickHou
* @param topicName
* @param records
*/
private void processRecordsByTopic(String topicName, ConcurrentLinkedQueue<ClickHouseStruct> records) throws SQLException {
private void processRecordsByTopic(String topicName, Queue<ClickHouseStruct> records) throws SQLException {

//The user parameter will override the topic mapping to table.
String tableName = getTableFromTopic(topicName);
Expand All @@ -154,22 +178,12 @@ private void processRecordsByTopic(String topicName, ConcurrentLinkedQueue<Click
// Step 1: The Batch Insert with preparedStatement in JDBC
// works by forming the Query and then adding records to the Batch.
// This step creates a Map of Query -> Records(List of ClickHouseStruct)
Map<MutablePair<String, Map<String, Integer>>, List<ClickHouseStruct>> queryToRecordsMap;

if(topicToRecordsMap.containsKey(topicName)) {
queryToRecordsMap = topicToRecordsMap.get(topicName);
} else {
queryToRecordsMap = new HashMap<>();
topicToRecordsMap.put(topicName, queryToRecordsMap);
}
Map<MutablePair<String, Map<String, Integer>>, List<ClickHouseStruct>> queryToRecordsMap = new HashMap<>();

Map<TopicPartition, Long> partitionToOffsetMap = writer.groupQueryWithRecords(records, queryToRecordsMap);
BlockMetaData bmd = new BlockMetaData();

if(flushRecordsToClickHouse(topicName, writer, queryToRecordsMap, bmd)) {
// Remove the entry.
queryToRecordsMap.remove(topicName);
}
flushRecordsToClickHouse(topicName, writer, queryToRecordsMap, bmd);

if (this.config.getBoolean(ClickHouseSinkConnectorConfigVariables.ENABLE_KAFKA_OFFSET.toString())) {
log.info("***** KAFKA OFFSET MANAGEMENT ENABLED *****");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@

import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

public class ClickHouseBatchRunnableTest {


ConcurrentHashMap<String, ConcurrentLinkedQueue<ClickHouseStruct>>
ConcurrentHashMap<String, Queue<ClickHouseStruct>>
records = new ConcurrentHashMap<>();
Map<String, String> topic2TableMap = new HashMap<>();

Expand Down
Loading