Skip to content

Commit

Permalink
[#1086] [Doc] Simplify the Gluten code and add the doc (#1322)
Browse files Browse the repository at this point in the history
* gluten integrate for branch-0.8

* spotless check

* add WriteBufferManagerTest test

* todo

* remove  addPartition method, add some docs
  • Loading branch information
summaryzb authored Nov 27, 2023
1 parent cf25897 commit aa25cfa
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 3 deletions.
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,16 @@ After apply the patch and rebuild spark, add following configuration in spark co
spark.dynamicAllocation.enabled true
```

### Support Spark Columnar Shuffle with Gluten
To support spark columnar shuffle with Uniffle, use Gluten client
refer to [Gluten Project](https://github.com/oap-project/gluten)

Update Spark conf to enable integration of Uniffle with Gluten:
```
spark.plugins io.glutenproject.GlutenPlugin
spark.shuffle.manager org.apache.spark.shuffle.gluten.uniffle.GlutenRssShuffleManager
```

### Deploy MapReduce Client

1. Add client jar to the classpath of each NodeManager, e.g., <HADOOP>/share/hadoop/mapreduce/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,14 @@ public class RssShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
private final Map<Integer, List<ShuffleServerInfo>> partitionToServers;
private final Set<ShuffleServerInfo> shuffleServersForData;
private final long[] partitionLengths;
private final boolean isMemoryShuffleEnabled;
private final Function<String, Boolean> taskFailureCallback;
private final Set<Long> blockIds = Sets.newConcurrentHashSet();

/** used by columnar rss shuffle writer implementation */
protected final long taskAttemptId;

protected final ShuffleWriteMetrics shuffleWriteMetrics;
protected final boolean isMemoryShuffleEnabled;

private final BlockingQueue<Object> finishEventQueue = new LinkedBlockingQueue<>();

Expand Down Expand Up @@ -213,7 +213,7 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
}
}

private void writeImpl(Iterator<Product2<K, V>> records) {
protected void writeImpl(Iterator<Product2<K, V>> records) throws IOException {
List<ShuffleBlockInfo> shuffleBlockInfos;
boolean isCombine = shuffleDependency.mapSideCombine();
Function1<V, C> createCombiner = null;
Expand Down Expand Up @@ -243,7 +243,7 @@ private void writeImpl(Iterator<Product2<K, V>> records) {
processShuffleBlockInfos(shuffleBlockInfos);
}
long checkStartTs = System.currentTimeMillis();
checkBlockSendResult(blockIds);
internalCheckBlockSendResult();
long commitStartTs = System.currentTimeMillis();
long checkDuration = commitStartTs - checkStartTs;
if (!isMemoryShuffleEnabled) {
Expand Down Expand Up @@ -309,6 +309,10 @@ protected List<CompletableFuture<Long>> postBlockEvent(
return futures;
}

protected void internalCheckBlockSendResult() {
checkBlockSendResult(blockIds);
}

@VisibleForTesting
protected void checkBlockSendResult(Set<Long> blockIds) {
boolean interrupted = false;
Expand Down

0 comments on commit aa25cfa

Please sign in to comment.