diff --git a/client-mr/mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/CelebornShuffleConsumer.java b/client-mr/mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/CelebornShuffleConsumer.java index ee48e3a30b..052b340ab1 100644 --- a/client-mr/mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/CelebornShuffleConsumer.java +++ b/client-mr/mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/CelebornShuffleConsumer.java @@ -102,7 +102,7 @@ public void init(Context context) { private ShuffleClientMetrics createMetrics( org.apache.hadoop.mapreduce.TaskAttemptID taskAttemptID, JobConf jobConf) throws NoSuchMethodException { - // for hadoop 3 + // for hadoop 3.1+ see MAPREDUCE-6861 try { return DynMethods.builder("create") .impl( @@ -114,6 +114,16 @@ private ShuffleClientMetrics createMetrics( } catch (Exception e) { // ignore this exception because the createMetrics might use hadoop2 } + + // for hadoop 3.1 see MAPREDUCE-6526 + try { + return DynMethods.builder("create") + .impl(ShuffleClientMetrics.class) + .buildStaticChecked() + .invoke(taskAttemptID, jobConf); + } catch (Exception e) { + } + // for hadoop 2 return DynConstructors.builder(ShuffleClientMetrics.class) .hiddenImpl(new Class[] {org.apache.hadoop.mapreduce.TaskAttemptID.class, JobConf.class}) diff --git a/client-mr/mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/CelebornShuffleFetcher.java b/client-mr/mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/CelebornShuffleFetcher.java index 3944869f80..9f807e1654 100644 --- a/client-mr/mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/CelebornShuffleFetcher.java +++ b/client-mr/mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/CelebornShuffleFetcher.java @@ -78,13 +78,17 @@ public void fetchAndMerge() { // If merge is on, block merger.waitForResource(); // Do shuffle - metrics.threadBusy(); + if (metrics != null) { + metrics.threadBusy(); + } // read blocks fetchToLocalAndMerge(); } catch (Exception e) { logger.error("Celeborn shuffle fetcher fetch data failed.", e); } finally { - metrics.threadFree(); + if (metrics != null) { + metrics.threadFree(); + } } } } @@ -134,7 +138,9 @@ private void fetchToLocalAndMerge() throws IOException { reporter.progress(); } else { celebornInputStream.close(); - metrics.inputBytes(inputShuffleSize); + if (metrics != null) { + metrics.inputBytes(inputShuffleSize); + } logger.info("reduce task {} read {} bytes", reduceId, inputShuffleSize); stopped = true; }