Skip to content

Commit

Permalink
more tweaks
Browse files Browse the repository at this point in the history
  • Loading branch information
msbarry committed Oct 30, 2023
1 parent 227fdc4 commit 8899689
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import com.google.common.util.concurrent.RateLimiter;
import com.onthegomap.planetiler.config.PlanetilerConfig;
import com.onthegomap.planetiler.stats.Counter;
import com.onthegomap.planetiler.stats.ProgressLoggers;
import com.onthegomap.planetiler.stats.Stats;
import com.onthegomap.planetiler.worker.RunnableThatThrows;
Expand All @@ -18,9 +19,7 @@
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
Expand All @@ -33,7 +32,6 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -67,23 +65,28 @@ public class Downloader {
private static final Logger LOGGER = LoggerFactory.getLogger(Downloader.class);
private final PlanetilerConfig config;
private final List<ResourceToDownload> toDownloadList = new ArrayList<>();
private final HttpClient client = HttpClient.newBuilder()
// explicitly follow redirects to capture final redirect url
.followRedirects(HttpClient.Redirect.NEVER).build();
private final HttpClient client;
private final ExecutorService executor;
private final Stats stats;
private final long chunkSizeBytes;
private final ResourceUsage diskSpaceCheck = new ResourceUsage("download");
private final RateLimiter rateLimiter;
private final Semaphore concurrentDownloads;
private final Semaphore concurrentDiskWrites;

Check warning on line 75 in planetiler-core/src/main/java/com/onthegomap/planetiler/util/Downloader.java

View workflow job for this annotation

GitHub Actions / Analyze with Sonar

MAJOR CODE_SMELL

Remove this unused "concurrentDiskWrites" private field. rule: java:S1068 (https://sonarcloud.io/organizations/onthegomap/rules?open=java%3AS1068&rule_key=java%3AS1068) issue url: https://sonarcloud.io/project/issues?pullRequest=702&open=AYuADdy1s5iTIkYNnA3n&id=onthegomap_planetiler

Downloader(PlanetilerConfig config, Stats stats, long chunkSizeBytes) {
this.rateLimiter = config.downloadMaxBandwidth() == 0 ? null : RateLimiter.create(config.downloadMaxBandwidth());
this.chunkSizeBytes = chunkSizeBytes;
this.config = config;
this.stats = stats;
this.executor = Executors.newVirtualThreadPerTaskExecutor();
this.client = HttpClient.newBuilder()
// explicitly follow redirects to capture final redirect url
.followRedirects(HttpClient.Redirect.NEVER)
.executor(executor)
.build();
this.concurrentDownloads = new Semaphore(config.downloadThreads());
this.concurrentDiskWrites = new Semaphore(10);
}

public static Downloader create(PlanetilerConfig config, Stats stats) {
Expand Down Expand Up @@ -170,7 +173,8 @@ public void run() {
for (var toDownload : toDownloadList) {
try {
long size = toDownload.metadata.get(10, TimeUnit.SECONDS).size;
loggers.addStorageRatePercentCounter(toDownload.id, size, toDownload::bytesDownloaded, true);
loggers.addStorageRatePercentCounter(toDownload.id, size, toDownload::bytesDownloaded, true)
.addFileSize(toDownload.tmpPath());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Error getting size of " + toDownload.url, e);
Expand All @@ -188,6 +192,7 @@ CompletableFuture<Void> downloadIfNecessary(ResourceToDownload resourceToDownloa
LogUtil.setStage("download", resourceToDownload.id);
long existingSize = FileUtils.size(resourceToDownload.output);
var metadata = httpHeadFollowRedirects(resourceToDownload.url, 0);
Path tmpPath = resourceToDownload.tmpPath();
resourceToDownload.metadata.complete(metadata);
if (metadata.size == existingSize) {
LOGGER.info("Skipping {}: {} already up-to-date", resourceToDownload.id, resourceToDownload.output);
Expand All @@ -199,18 +204,18 @@ CompletableFuture<Void> downloadIfNecessary(ResourceToDownload resourceToDownloa
LOGGER.info("Downloading {}{} to {}", resourceToDownload.url, redirectInfo, resourceToDownload.output);
FileUtils.delete(resourceToDownload.output);
FileUtils.createParentDirectories(resourceToDownload.output);
Path tmpPath = resourceToDownload.tmpPath();
FileUtils.delete(tmpPath);
FileUtils.deleteOnExit(tmpPath);
diskSpaceCheck.addDisk(tmpPath, metadata.size, resourceToDownload.id);
diskSpaceCheck.checkAgainstLimits(config.force(), false);
httpDownload(resourceToDownload, tmpPath);
Files.move(tmpPath, resourceToDownload.output);
FileUtils.delete(tmpPath);
LOGGER.info("Finished downloading {} to {}", resourceToDownload.url, resourceToDownload.output);
} catch (Exception e) {

Check warning on line 214 in planetiler-core/src/main/java/com/onthegomap/planetiler/util/Downloader.java

View workflow job for this annotation

GitHub Actions / Analyze with Sonar

MAJOR CODE_SMELL

Either log this exception and handle it, or rethrow it with some contextual information. rule: java:S2139 (https://sonarcloud.io/organizations/onthegomap/rules?open=java%3AS2139&rule_key=java%3AS2139) issue url: https://sonarcloud.io/project/issues?pullRequest=702&open=AYuADdy1s5iTIkYNnA3m&id=onthegomap_planetiler
LOGGER.error("Error downloading {} to {}", resourceToDownload.url, resourceToDownload.output, e);
throw e;
} finally {
FileUtils.delete(tmpPath);
}
}), executor);
}
Expand Down Expand Up @@ -247,61 +252,52 @@ ResourceMetadata httpHead(String url) throws IOException, InterruptedException {
}

private void httpDownload(ResourceToDownload resource, Path tmpPath)
throws ExecutionException, InterruptedException, IOException {
/*
* Alternative using async HTTP client:
*
* return client.sendAsync(newHttpRequest(url).GET().build(), responseInfo -> {
* assertOK(responseInfo);
* return HttpResponse.BodyHandlers.ofFile(path).apply(responseInfo);
*
* But it is slower on large files
*/
throws ExecutionException, InterruptedException {
var metadata = resource.metadata().get();
String canonicalUrl = metadata.canonicalUrl();
record Range(long start, long end) {

long size() {
return end - start;
}
}
record Range(long start, long end) {}
List<Range> chunks = new ArrayList<>();
boolean ranges = metadata.acceptRange && config.downloadThreads() > 1;
long chunkSize = ranges ? chunkSizeBytes : metadata.size;
for (long start = 0; start < metadata.size; start += chunkSize) {
long end = Math.min(start + chunkSize, metadata.size);
chunks.add(new Range(start, end));
}
Files.createFile(tmpPath);
Worker.joinFutures(chunks.stream().map(chunk -> CompletableFuture.runAsync(RunnableThatThrows.wrap(() -> {
FileUtils.setLength(tmpPath, metadata.size);
Worker.joinFutures(chunks.stream().map(range -> CompletableFuture.runAsync(RunnableThatThrows.wrap(() -> {
LogUtil.setStage("download", resource.id);
concurrentDownloads.acquire();
try (var fileChannel = FileChannel.open(tmpPath, WRITE)) {
var range = chunk;
while (range.size() > 0) {
try (
var inputStream = (ranges || range.start > 0) ? openStreamRange(canonicalUrl, range.start, range.end) :
openStream(canonicalUrl);
var input = new ProgressChannel(Channels.newChannel(inputStream), resource.progress, rateLimiter)
) {
// ensure this file has been allocated up to the start of this block
fileChannel.write(ByteBuffer.allocate(1), range.start);
fileChannel.position(range.start);
long transferred = fileChannel.transferFrom(input, range.start, range.size());
if (transferred == 0) {
throw new IOException("Transferred 0 bytes but " + range.size() + " expected: " + canonicalUrl);
} else if (transferred != range.size() && !metadata.acceptRange) {
throw new IOException(
"Transferred " + transferred + " bytes but " + range.size() + " expected: " + canonicalUrl +
" and server does not support range requests");
var counter = resource.progress.counterForThread();
try (
var fc = FileChannel.open(tmpPath, WRITE);
var inputStream = (ranges || range.start > 0) ?
openStreamRange(canonicalUrl, range.start, range.end) :
openStream(canonicalUrl);
) {
long offset = range.start;
byte[] buffer = new byte[16384];
int read;
while (offset < range.end && (read = inputStream.read(buffer, 0, 16384)) >= 0) {
counter.incBy(read);
if (rateLimiter != null) {
rateLimiter.acquire(read);
}
int position = 0;
int remaining = read;
while (remaining > 0) {
int written = fc.write(ByteBuffer.wrap(buffer, position, remaining), offset);
if (written <= 0) {
throw new IOException("Failed to write to " + tmpPath);
}
range = new Range(range.start + transferred, range.end);
position += written;
remaining -= written;
offset += written;
}
}
} finally {
concurrentDownloads.release();
}
}), executor)).toArray(CompletableFuture[]::new));
}), executor)).toArray(CompletableFuture[]::new)).get();
}

private HttpRequest.Builder newHttpRequest(String url) {
Expand All @@ -313,11 +309,12 @@ private HttpRequest.Builder newHttpRequest(String url) {
record ResourceMetadata(Optional<String> redirect, String canonicalUrl, long size, boolean acceptRange) {}

record ResourceToDownload(
String id, String url, Path output, CompletableFuture<ResourceMetadata> metadata, AtomicLong progress
String id, String url, Path output, CompletableFuture<ResourceMetadata> metadata,
Counter.MultiThreadCounter progress
) {

ResourceToDownload(String id, String url, Path output) {
this(id, url, output, new CompletableFuture<>(), new AtomicLong(0));
this(id, url, output, new CompletableFuture<>(), Counter.newMultiThreadCounter());
}

public Path tmpPath() {
Expand All @@ -328,33 +325,4 @@ public long bytesDownloaded() {
return progress.get();
}
}

/**
* Wrapper for a {@link ReadableByteChannel} that captures progress information.
*/
private record ProgressChannel(ReadableByteChannel inner, AtomicLong progress, RateLimiter rateLimiter)
implements ReadableByteChannel {

@Override
public int read(ByteBuffer dst) throws IOException {
int n = inner.read(dst);
if (n > 0) {
if (rateLimiter != null) {
rateLimiter.acquire(n);
}
progress.addAndGet(n);
}
return n;
}

@Override
public boolean isOpen() {
return inner.isOpen();
}

@Override
public void close() throws IOException {
inner.close();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
package com.onthegomap.planetiler.util;

import static java.nio.file.StandardOpenOption.CREATE;
import static java.nio.file.StandardOpenOption.WRITE;

import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.ClosedFileSystemException;
import java.nio.file.FileStore;
import java.nio.file.FileSystem;
Expand Down Expand Up @@ -263,7 +268,7 @@ public static void unzipResource(String resource, Path dest) {
* @throws UncheckedIOException if an IO exception occurs
*/
public static void safeCopy(InputStream inputStream, Path destPath) {
try (var outputStream = Files.newOutputStream(destPath, StandardOpenOption.CREATE, StandardOpenOption.WRITE)) {
try (var outputStream = Files.newOutputStream(destPath, StandardOpenOption.CREATE, WRITE)) {
int totalSize = 0;

int nBytes;
Expand Down Expand Up @@ -310,7 +315,7 @@ public static void unzip(InputStream input, Path destDir) {

try (
var out = Files.newOutputStream(destination, StandardOpenOption.CREATE_NEW,
StandardOpenOption.WRITE)
WRITE)
) {
totalEntryArchive++;
while ((nBytes = zip.read(buffer)) > 0) {
Expand Down Expand Up @@ -366,4 +371,16 @@ public static boolean isNewer(Path src, Path dest) {
return true;
}
}

/** Expands the file at {@code path} to {@code size} bytes. */
public static void setLength(Path path, long size) {
try (var fc = FileChannel.open(path, CREATE, WRITE)) {
int written = fc.write(ByteBuffer.allocate(1), size - 1);
if (written != 1) {
throw new IOException("Unable to expand " + path + " to " + size);
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}
Loading

0 comments on commit 8899689

Please sign in to comment.