From 8899689593b070256234496055a429d80d642ae2 Mon Sep 17 00:00:00 2001 From: Mike Barry Date: Mon, 30 Oct 2023 05:59:46 -0400 Subject: [PATCH] more tweaks --- .../planetiler/util/Downloader.java | 124 +++++++----------- .../onthegomap/planetiler/util/FileUtils.java | 21 ++- .../planetiler/util/DownloaderTest.java | 48 ++++--- .../planetiler/util/FileUtilsTest.java | 7 + 4 files changed, 94 insertions(+), 106 deletions(-) diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/util/Downloader.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/util/Downloader.java index 28f1d9a80f..530e200a0f 100644 --- a/planetiler-core/src/main/java/com/onthegomap/planetiler/util/Downloader.java +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/util/Downloader.java @@ -5,6 +5,7 @@ import com.google.common.util.concurrent.RateLimiter; import com.onthegomap.planetiler.config.PlanetilerConfig; +import com.onthegomap.planetiler.stats.Counter; import com.onthegomap.planetiler.stats.ProgressLoggers; import com.onthegomap.planetiler.stats.Stats; import com.onthegomap.planetiler.worker.RunnableThatThrows; @@ -18,9 +19,7 @@ import java.net.http.HttpRequest; import java.net.http.HttpResponse; import java.nio.ByteBuffer; -import java.nio.channels.Channels; import java.nio.channels.FileChannel; -import java.nio.channels.ReadableByteChannel; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; @@ -33,7 +32,6 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicLong; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,15 +65,14 @@ public class Downloader { private static final Logger LOGGER = LoggerFactory.getLogger(Downloader.class); private final PlanetilerConfig config; private final List toDownloadList = new ArrayList<>(); - private final HttpClient client = HttpClient.newBuilder() - // explicitly follow redirects to capture final redirect url - .followRedirects(HttpClient.Redirect.NEVER).build(); + private final HttpClient client; private final ExecutorService executor; private final Stats stats; private final long chunkSizeBytes; private final ResourceUsage diskSpaceCheck = new ResourceUsage("download"); private final RateLimiter rateLimiter; private final Semaphore concurrentDownloads; + private final Semaphore concurrentDiskWrites; Downloader(PlanetilerConfig config, Stats stats, long chunkSizeBytes) { this.rateLimiter = config.downloadMaxBandwidth() == 0 ? null : RateLimiter.create(config.downloadMaxBandwidth()); @@ -83,7 +80,13 @@ public class Downloader { this.config = config; this.stats = stats; this.executor = Executors.newVirtualThreadPerTaskExecutor(); + this.client = HttpClient.newBuilder() + // explicitly follow redirects to capture final redirect url + .followRedirects(HttpClient.Redirect.NEVER) + .executor(executor) + .build(); this.concurrentDownloads = new Semaphore(config.downloadThreads()); + this.concurrentDiskWrites = new Semaphore(10); } public static Downloader create(PlanetilerConfig config, Stats stats) { @@ -170,7 +173,8 @@ public void run() { for (var toDownload : toDownloadList) { try { long size = toDownload.metadata.get(10, TimeUnit.SECONDS).size; - loggers.addStorageRatePercentCounter(toDownload.id, size, toDownload::bytesDownloaded, true); + loggers.addStorageRatePercentCounter(toDownload.id, size, toDownload::bytesDownloaded, true) + .addFileSize(toDownload.tmpPath()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IllegalStateException("Error getting size of " + toDownload.url, e); @@ -188,6 +192,7 @@ CompletableFuture downloadIfNecessary(ResourceToDownload resourceToDownloa LogUtil.setStage("download", resourceToDownload.id); long existingSize = FileUtils.size(resourceToDownload.output); var metadata = httpHeadFollowRedirects(resourceToDownload.url, 0); + Path tmpPath = resourceToDownload.tmpPath(); resourceToDownload.metadata.complete(metadata); if (metadata.size == existingSize) { LOGGER.info("Skipping {}: {} already up-to-date", resourceToDownload.id, resourceToDownload.output); @@ -199,18 +204,18 @@ CompletableFuture downloadIfNecessary(ResourceToDownload resourceToDownloa LOGGER.info("Downloading {}{} to {}", resourceToDownload.url, redirectInfo, resourceToDownload.output); FileUtils.delete(resourceToDownload.output); FileUtils.createParentDirectories(resourceToDownload.output); - Path tmpPath = resourceToDownload.tmpPath(); FileUtils.delete(tmpPath); FileUtils.deleteOnExit(tmpPath); diskSpaceCheck.addDisk(tmpPath, metadata.size, resourceToDownload.id); diskSpaceCheck.checkAgainstLimits(config.force(), false); httpDownload(resourceToDownload, tmpPath); Files.move(tmpPath, resourceToDownload.output); - FileUtils.delete(tmpPath); LOGGER.info("Finished downloading {} to {}", resourceToDownload.url, resourceToDownload.output); } catch (Exception e) { LOGGER.error("Error downloading {} to {}", resourceToDownload.url, resourceToDownload.output, e); throw e; + } finally { + FileUtils.delete(tmpPath); } }), executor); } @@ -247,24 +252,10 @@ ResourceMetadata httpHead(String url) throws IOException, InterruptedException { } private void httpDownload(ResourceToDownload resource, Path tmpPath) - throws ExecutionException, InterruptedException, IOException { - /* - * Alternative using async HTTP client: - * - * return client.sendAsync(newHttpRequest(url).GET().build(), responseInfo -> { - * assertOK(responseInfo); - * return HttpResponse.BodyHandlers.ofFile(path).apply(responseInfo); - * - * But it is slower on large files - */ + throws ExecutionException, InterruptedException { var metadata = resource.metadata().get(); String canonicalUrl = metadata.canonicalUrl(); - record Range(long start, long end) { - - long size() { - return end - start; - } - } + record Range(long start, long end) {} List chunks = new ArrayList<>(); boolean ranges = metadata.acceptRange && config.downloadThreads() > 1; long chunkSize = ranges ? chunkSizeBytes : metadata.size; @@ -272,36 +263,41 @@ long size() { long end = Math.min(start + chunkSize, metadata.size); chunks.add(new Range(start, end)); } - Files.createFile(tmpPath); - Worker.joinFutures(chunks.stream().map(chunk -> CompletableFuture.runAsync(RunnableThatThrows.wrap(() -> { + FileUtils.setLength(tmpPath, metadata.size); + Worker.joinFutures(chunks.stream().map(range -> CompletableFuture.runAsync(RunnableThatThrows.wrap(() -> { LogUtil.setStage("download", resource.id); concurrentDownloads.acquire(); - try (var fileChannel = FileChannel.open(tmpPath, WRITE)) { - var range = chunk; - while (range.size() > 0) { - try ( - var inputStream = (ranges || range.start > 0) ? openStreamRange(canonicalUrl, range.start, range.end) : - openStream(canonicalUrl); - var input = new ProgressChannel(Channels.newChannel(inputStream), resource.progress, rateLimiter) - ) { - // ensure this file has been allocated up to the start of this block - fileChannel.write(ByteBuffer.allocate(1), range.start); - fileChannel.position(range.start); - long transferred = fileChannel.transferFrom(input, range.start, range.size()); - if (transferred == 0) { - throw new IOException("Transferred 0 bytes but " + range.size() + " expected: " + canonicalUrl); - } else if (transferred != range.size() && !metadata.acceptRange) { - throw new IOException( - "Transferred " + transferred + " bytes but " + range.size() + " expected: " + canonicalUrl + - " and server does not support range requests"); + var counter = resource.progress.counterForThread(); + try ( + var fc = FileChannel.open(tmpPath, WRITE); + var inputStream = (ranges || range.start > 0) ? + openStreamRange(canonicalUrl, range.start, range.end) : + openStream(canonicalUrl); + ) { + long offset = range.start; + byte[] buffer = new byte[16384]; + int read; + while (offset < range.end && (read = inputStream.read(buffer, 0, 16384)) >= 0) { + counter.incBy(read); + if (rateLimiter != null) { + rateLimiter.acquire(read); + } + int position = 0; + int remaining = read; + while (remaining > 0) { + int written = fc.write(ByteBuffer.wrap(buffer, position, remaining), offset); + if (written <= 0) { + throw new IOException("Failed to write to " + tmpPath); } - range = new Range(range.start + transferred, range.end); + position += written; + remaining -= written; + offset += written; } } } finally { concurrentDownloads.release(); } - }), executor)).toArray(CompletableFuture[]::new)); + }), executor)).toArray(CompletableFuture[]::new)).get(); } private HttpRequest.Builder newHttpRequest(String url) { @@ -313,11 +309,12 @@ private HttpRequest.Builder newHttpRequest(String url) { record ResourceMetadata(Optional redirect, String canonicalUrl, long size, boolean acceptRange) {} record ResourceToDownload( - String id, String url, Path output, CompletableFuture metadata, AtomicLong progress + String id, String url, Path output, CompletableFuture metadata, + Counter.MultiThreadCounter progress ) { ResourceToDownload(String id, String url, Path output) { - this(id, url, output, new CompletableFuture<>(), new AtomicLong(0)); + this(id, url, output, new CompletableFuture<>(), Counter.newMultiThreadCounter()); } public Path tmpPath() { @@ -328,33 +325,4 @@ public long bytesDownloaded() { return progress.get(); } } - - /** - * Wrapper for a {@link ReadableByteChannel} that captures progress information. - */ - private record ProgressChannel(ReadableByteChannel inner, AtomicLong progress, RateLimiter rateLimiter) - implements ReadableByteChannel { - - @Override - public int read(ByteBuffer dst) throws IOException { - int n = inner.read(dst); - if (n > 0) { - if (rateLimiter != null) { - rateLimiter.acquire(n); - } - progress.addAndGet(n); - } - return n; - } - - @Override - public boolean isOpen() { - return inner.isOpen(); - } - - @Override - public void close() throws IOException { - inner.close(); - } - } } diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/util/FileUtils.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/util/FileUtils.java index cbd6037381..1cbbe13875 100644 --- a/planetiler-core/src/main/java/com/onthegomap/planetiler/util/FileUtils.java +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/util/FileUtils.java @@ -1,8 +1,13 @@ package com.onthegomap.planetiler.util; +import static java.nio.file.StandardOpenOption.CREATE; +import static java.nio.file.StandardOpenOption.WRITE; + import java.io.IOException; import java.io.InputStream; import java.io.UncheckedIOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; import java.nio.file.ClosedFileSystemException; import java.nio.file.FileStore; import java.nio.file.FileSystem; @@ -263,7 +268,7 @@ public static void unzipResource(String resource, Path dest) { * @throws UncheckedIOException if an IO exception occurs */ public static void safeCopy(InputStream inputStream, Path destPath) { - try (var outputStream = Files.newOutputStream(destPath, StandardOpenOption.CREATE, StandardOpenOption.WRITE)) { + try (var outputStream = Files.newOutputStream(destPath, StandardOpenOption.CREATE, WRITE)) { int totalSize = 0; int nBytes; @@ -310,7 +315,7 @@ public static void unzip(InputStream input, Path destDir) { try ( var out = Files.newOutputStream(destination, StandardOpenOption.CREATE_NEW, - StandardOpenOption.WRITE) + WRITE) ) { totalEntryArchive++; while ((nBytes = zip.read(buffer)) > 0) { @@ -366,4 +371,16 @@ public static boolean isNewer(Path src, Path dest) { return true; } } + + /** Expands the file at {@code path} to {@code size} bytes. */ + public static void setLength(Path path, long size) { + try (var fc = FileChannel.open(path, CREATE, WRITE)) { + int written = fc.write(ByteBuffer.allocate(1), size - 1); + if (written != 1) { + throw new IOException("Unable to expand " + path + " to " + size); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } } diff --git a/planetiler-core/src/test/java/com/onthegomap/planetiler/util/DownloaderTest.java b/planetiler-core/src/test/java/com/onthegomap/planetiler/util/DownloaderTest.java index f7143977e8..ef367e2842 100644 --- a/planetiler-core/src/test/java/com/onthegomap/planetiler/util/DownloaderTest.java +++ b/planetiler-core/src/test/java/com/onthegomap/planetiler/util/DownloaderTest.java @@ -9,12 +9,11 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; -import java.util.Arrays; import java.util.Map; import java.util.Optional; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicLong; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; @@ -26,25 +25,25 @@ class DownloaderTest { Path path; private final PlanetilerConfig config = PlanetilerConfig.defaults(); private final Stats stats = Stats.inMemory(); - private long downloads = 0; + private AtomicLong downloads = new AtomicLong(0); - private Downloader mockDownloader(Map resources, boolean supportsRange, int maxLength) { + private Downloader mockDownloader(Map resources, boolean supportsRange) { return new Downloader(config, stats, 2L) { @Override InputStream openStream(String url) { - downloads++; + downloads.incrementAndGet(); assertTrue(resources.containsKey(url), "no resource for " + url); byte[] bytes = resources.get(url); - return new ByteArrayInputStream(maxLength < bytes.length ? Arrays.copyOf(bytes, maxLength) : bytes); + return new ByteArrayInputStream(bytes); } @Override InputStream openStreamRange(String url, long start, long end) { assertTrue(supportsRange, "does not support range"); - downloads++; + downloads.incrementAndGet(); assertTrue(resources.containsKey(url), "no resource for " + url); - byte[] result = new byte[Math.min(maxLength, (int) (end - start))]; + byte[] result = new byte[(int) (end - start)]; byte[] bytes = resources.get(url); for (int i = (int) start; i < start + result.length; i++) { result[(int) (i - start)] = bytes[i]; @@ -53,31 +52,28 @@ InputStream openStreamRange(String url, long start, long end) { } @Override - CompletableFuture httpHead(String url) { + ResourceMetadata httpHead(String url) { String[] parts = url.split("#"); if (parts.length > 1) { int redirectNum = Integer.parseInt(parts[1]); String next = redirectNum <= 1 ? parts[0] : (parts[0] + "#" + (redirectNum - 1)); - return CompletableFuture.supplyAsync( - () -> new ResourceMetadata(Optional.of(next), url, 0, supportsRange)); + return new ResourceMetadata(Optional.of(next), url, 0, supportsRange); } byte[] bytes = resources.get(url); - return CompletableFuture.supplyAsync( - () -> new ResourceMetadata(Optional.empty(), url, bytes.length, supportsRange)); + return new ResourceMetadata(Optional.empty(), url, bytes.length, supportsRange); } }; } @ParameterizedTest @CsvSource({ - "false,100,0", - "true,100,0", - "true,2,0", - "false,100,1", - "false,100,2", - "true,2,4", + "false,0", + "true,0", + "false,1", + "false,2", + "true,4", }) - void testDownload(boolean range, int maxLength, int redirects) throws Exception { + void testDownload(boolean range, int redirects) throws Exception { Path dest = path.resolve("out"); String string = "0123456789"; String url = "http://url"; @@ -85,7 +81,7 @@ void testDownload(boolean range, int maxLength, int redirects) throws Exception Map resources = new ConcurrentHashMap<>(); byte[] bytes = string.getBytes(StandardCharsets.UTF_8); - Downloader downloader = mockDownloader(resources, range, maxLength); + Downloader downloader = mockDownloader(resources, range); // fails if no data var resource1 = new Downloader.ResourceToDownload("resource", initialUrl, dest); @@ -102,10 +98,10 @@ void testDownload(boolean range, int maxLength, int redirects) throws Exception assertEquals(10, resource2.bytesDownloaded()); // does not re-request if size is the same - downloads = 0; + downloads.set(0); var resource3 = new Downloader.ResourceToDownload("resource", initialUrl, dest); downloader.downloadIfNecessary(resource3).get(); - assertEquals(0, downloads); + assertEquals(0, downloads.get()); assertEquals(string, Files.readString(dest)); assertEquals(FileUtils.size(path), FileUtils.size(dest)); assertEquals(0, resource3.bytesDownloaded()); @@ -115,7 +111,7 @@ void testDownload(boolean range, int maxLength, int redirects) throws Exception String newContent = "54321"; resources.put(url, newContent.getBytes(StandardCharsets.UTF_8)); downloader.downloadIfNecessary(resource4).get(); - assertTrue(downloads > 0, "downloads were " + downloads); + assertTrue(downloads.get() > 0, "downloads were " + downloads); assertEquals(newContent, Files.readString(dest)); assertEquals(FileUtils.size(path), FileUtils.size(dest)); assertEquals(5, resource4.bytesDownloaded()); @@ -136,8 +132,8 @@ InputStream openStreamRange(String url, long start, long end) { } @Override - CompletableFuture httpHead(String url) { - return CompletableFuture.completedFuture(new ResourceMetadata(Optional.empty(), url, Long.MAX_VALUE, true)); + ResourceMetadata httpHead(String url) { + return new ResourceMetadata(Optional.empty(), url, Long.MAX_VALUE, true); } }; diff --git a/planetiler-core/src/test/java/com/onthegomap/planetiler/util/FileUtilsTest.java b/planetiler-core/src/test/java/com/onthegomap/planetiler/util/FileUtilsTest.java index 0dbb5b713f..2f480cbdec 100644 --- a/planetiler-core/src/test/java/com/onthegomap/planetiler/util/FileUtilsTest.java +++ b/planetiler-core/src/test/java/com/onthegomap/planetiler/util/FileUtilsTest.java @@ -152,4 +152,11 @@ void testWalkPathWithPatternSingleZip() { List.of("/shapefile/stations.shp", "/shapefile/stations.shx"), matchingPaths.stream().map(Path::toString).sorted().toList()); } + + @Test + void testExpandFile() throws IOException { + Path path = tmpDir.resolve("toExpand"); + FileUtils.setLength(path, 1000); + assertEquals(1000, Files.size(path)); + } }