Skip to content

Commit

Permalink
fix the data pull problem when use alluxio as fs.
Browse files Browse the repository at this point in the history
  • Loading branch information
tanyajun committed Oct 17, 2024
1 parent a1aa0de commit 30ac4c8
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,18 @@
import java.util.List;
import java.util.Optional;

import static io.trino.filesystem.alluxio.AlluxioUtils.convertToLocation;
import static java.util.Objects.requireNonNull;

public class AlluxioFileIterator
implements FileIterator
{
private final Iterator<URIStatus> files;
private final String mountRoot;
private final Location pathLocation;

public AlluxioFileIterator(List<URIStatus> files, String mountRoot)
public AlluxioFileIterator(List<URIStatus> files, Location pathLocation)
{
this.files = requireNonNull(files.iterator(), "files is null");
this.mountRoot = requireNonNull(mountRoot, "mountRoot is null");
this.pathLocation = requireNonNull(pathLocation, "pathLocation is null");
}

@Override
Expand All @@ -54,7 +53,9 @@ public FileEntry next()
return null;
}
URIStatus fileStatus = files.next();
Location location = convertToLocation(fileStatus.getPath(), mountRoot);
String filePath = fileStatus.getPath();
String fileName = filePath.substring(filePath.lastIndexOf('/') + 1);
Location location = pathLocation.appendSuffix("/" + fileName);
return new FileEntry(
location,
fileStatus.getLength(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,20 +189,20 @@ public FileIterator listFiles(Location location)
try {
URIStatus status = alluxioClient.getStatus(convertToAlluxioURI(location, mountRoot));
if (status == null) {
new AlluxioFileIterator(Collections.emptyList(), mountRoot);
new AlluxioFileIterator(Collections.emptyList(), location);
}
if (!status.isFolder()) {
throw new IOException("Location is not a directory: %s".formatted(location));
}
}
catch (NotFoundRuntimeException | AlluxioException e) {
return new AlluxioFileIterator(Collections.emptyList(), mountRoot);
return new AlluxioFileIterator(Collections.emptyList(), location);
}

try {
List<URIStatus> filesStatus = alluxioClient.listStatus(convertToAlluxioURI(location, mountRoot),
ListStatusPOptions.newBuilder().setRecursive(true).build());
return new AlluxioFileIterator(filesStatus.stream().filter(status -> !status.isFolder() & status.isCompleted()).toList(), mountRoot);
return new AlluxioFileIterator(filesStatus.stream().filter(status -> !status.isFolder() & status.isCompleted()).toList(), location);
}
catch (AlluxioException e) {
throw new IOException("Error listFiles %s".formatted(location), e);
Expand Down

0 comments on commit 30ac4c8

Please sign in to comment.