Skip to content

Commit

Permalink
WIP. Switching to writing List<BlobSidecar>
Browse files Browse the repository at this point in the history
  • Loading branch information
david-ry4n committed Oct 4, 2024
1 parent bb14b6a commit a77a5ac
Show file tree
Hide file tree
Showing 9 changed files with 54 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,9 @@ protected SafeFuture<?> doStart() {
final DataArchive dataArchive =
config
.getBlobsArchivePath()
.map(path -> (DataArchive) new FileSystemArchive(Path.of(path)))
.map(
path ->
(DataArchive) new FileSystemArchive(config.getSpec(), Path.of(path)))
.orElse(new NoopDataArchive());

if (config.getSpec().isMilestoneSupported(SpecMilestone.DENEB)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package tech.pegasys.teku.storage.archive;

import java.io.IOException;
import java.util.List;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;

/**
Expand All @@ -30,5 +31,5 @@ public interface DataArchive {
* @return a closeable DataArchiveWriter for writing BlobSidecars
* @throws IOException throw exception if it fails to get a writer.
*/
DataArchiveWriter<BlobSidecar> getBlobSidecarWriter() throws IOException;
DataArchiveWriter<List<BlobSidecar>> getBlobSidecarWriter() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,47 +13,36 @@

package tech.pegasys.teku.storage.archive.fsarchive;

import static tech.pegasys.teku.infrastructure.json.types.CoreTypes.BYTES32_TYPE;
import static tech.pegasys.teku.infrastructure.json.types.CoreTypes.UINT64_TYPE;
import static tech.pegasys.teku.infrastructure.json.types.SerializableTypeDefinition.listOf;

import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
import tech.pegasys.teku.infrastructure.json.JsonUtil;
import tech.pegasys.teku.infrastructure.json.types.DeserializableTypeDefinition;
import tech.pegasys.teku.infrastructure.json.types.SerializableTypeDefinition;
import tech.pegasys.teku.infrastructure.json.types.StringValueTypeDefinition;
import tech.pegasys.teku.kzg.KZGCommitment;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.SpecMilestone;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.schemas.SchemaDefinitionCache;
import tech.pegasys.teku.spec.schemas.SchemaDefinitionsDeneb;

public class BlobSidecarJsonWriter {

public static final StringValueTypeDefinition<KZGCommitment> KZG_COMMITMENT_TYPE =
DeserializableTypeDefinition.string(KZGCommitment.class)
.formatter(KZGCommitment::toHexString)
.parser(KZGCommitment::fromHexString)
.example(
"0xb09ce4964278eff81a976fbc552488cb84fc4a102f004c87"
+ "179cb912f49904d1e785ecaf5d184522a58e9035875440ef")
.description("KZG Commitment")
.format("byte")
.build();

public static final SerializableTypeDefinition<BlobSidecar> BLOB_SIDECAR_TYPE =
SerializableTypeDefinition.object(BlobSidecar.class)
.name("BlobSidecar")
.withField("index", UINT64_TYPE, BlobSidecar::getIndex)
// .withField( "blob", CoreTypes, BlobSidecar::getBlob)
.withField("block_root", BYTES32_TYPE, BlobSidecar::getBlockRoot)
.withField("slot", UINT64_TYPE, BlobSidecar::getSlot)
.withField("kzg_commitment", KZG_COMMITMENT_TYPE, BlobSidecar::getKZGCommitment)
.build();

public BlobSidecarJsonWriter() {}

public void writeBlobSidecar(final OutputStream out, final BlobSidecar blobSidecar)
private final SchemaDefinitionCache schemaCache;

public BlobSidecarJsonWriter(final Spec spec) {
this.schemaCache = new SchemaDefinitionCache(spec);
}

public void writeSlotBlobSidecars(final OutputStream out, final List<BlobSidecar> blobSidecar)
throws IOException {

String output = JsonUtil.prettySerialize(blobSidecar, BLOB_SIDECAR_TYPE);
final DeserializableTypeDefinition<BlobSidecar> blobSidecarType =
SchemaDefinitionsDeneb.required(schemaCache.getSchemaDefinition(SpecMilestone.DENEB))
.getBlobSidecarSchema()
.getJsonTypeDefinition();

String output = JsonUtil.prettySerialize(blobSidecar, listOf(blobSidecarType));
System.out.println(output);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.blocks.SlotAndBlockRoot;
import tech.pegasys.teku.storage.archive.DataArchive;
Expand All @@ -40,13 +42,13 @@ public class FileSystemArchive implements DataArchive {
private final Path baseDirectory;
private final BlobSidecarJsonWriter jsonWriter;

public FileSystemArchive(final Path baseDirectory) {
public FileSystemArchive(final Spec spec, final Path baseDirectory) {
this.baseDirectory = baseDirectory;
this.jsonWriter = new BlobSidecarJsonWriter();
this.jsonWriter = new BlobSidecarJsonWriter(spec);
}

@Override
public DataArchiveWriter<BlobSidecar> getBlobSidecarWriter() throws IOException {
public DataArchiveWriter<List<BlobSidecar>> getBlobSidecarWriter() throws IOException {

try {
File indexFile = baseDirectory.resolve(INDEX_FILE).toFile();
Expand All @@ -57,7 +59,8 @@ public DataArchiveWriter<BlobSidecar> getBlobSidecarWriter() throws IOException
}
}

private class FileSystemBlobSidecarWriter implements DataArchiveWriter<BlobSidecar>, Closeable {
private class FileSystemBlobSidecarWriter
implements DataArchiveWriter<List<BlobSidecar>>, Closeable {
final BufferedWriter indexWriter;

public FileSystemBlobSidecarWriter(final File indexFile) throws IOException {
Expand All @@ -68,9 +71,12 @@ public FileSystemBlobSidecarWriter(final File indexFile) throws IOException {
}

@Override
public boolean archive(final BlobSidecar blobSidecar) {
public boolean archive(final List<BlobSidecar> blobSidecars) {
if (blobSidecars == null || blobSidecars.isEmpty()) {
return true;
}

SlotAndBlockRoot slotAndBlockRoot = blobSidecar.getSlotAndBlockRoot();
SlotAndBlockRoot slotAndBlockRoot = blobSidecars.getFirst().getSlotAndBlockRoot();
File file = resolve(baseDirectory, slotAndBlockRoot);
if (file.exists()) {
LOG.warn("Failed to write BlobSidecar. File exists: {}", file.toString());
Expand All @@ -83,7 +89,7 @@ public boolean archive(final BlobSidecar blobSidecar) {
}

try (FileOutputStream output = new FileOutputStream(file)) {
jsonWriter.writeBlobSidecar(output, blobSidecar);
jsonWriter.writeSlotBlobSidecars(output, blobSidecars);
indexWriter.write(formatIndexOutput(slotAndBlockRoot));
indexWriter.newLine();
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@
package tech.pegasys.teku.storage.archive.nooparchive;

import java.io.IOException;
import java.util.List;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.storage.archive.DataArchive;
import tech.pegasys.teku.storage.archive.DataArchiveWriter;

public class NoopDataArchive implements DataArchive {

@Override
public DataArchiveWriter<BlobSidecar> getBlobSidecarWriter() throws IOException {
public DataArchiveWriter<List<BlobSidecar>> getBlobSidecarWriter() throws IOException {
return new DataArchiveNoopWriter<>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ void storeFinalizedBlocks(
* @return true if number of pruned blobs reached the pruneLimit, false otherwise
*/
boolean pruneOldestBlobSidecars(
UInt64 lastSlotToPrune, int pruneLimit, final DataArchiveWriter<BlobSidecar> archiveWriter);
UInt64 lastSlotToPrune,
int pruneLimit,
final DataArchiveWriter<List<BlobSidecar>> archiveWriter);

boolean pruneOldestNonCanonicalBlobSidecars(UInt64 lastSlotToPrune, int pruneLimit);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ public Optional<UInt64> getEarliestBlobSidecarSlot() {
public boolean pruneOldestBlobSidecars(
final UInt64 lastSlotToPrune,
final int pruneLimit,
final DataArchiveWriter<BlobSidecar> archiveWriter) {
final DataArchiveWriter<List<BlobSidecar>> archiveWriter) {
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -152,7 +153,7 @@ private void pruneBlobsPriorToAvailabilityWindow() {
return;
}
LOG.debug("Pruning blobs up to slot {}, limit {}", latestPrunableSlot, pruneLimit);
try (DataArchiveWriter<BlobSidecar> archiveWriter = dataArchive.getBlobSidecarWriter()) {
try (DataArchiveWriter<List<BlobSidecar>> archiveWriter = dataArchive.getBlobSidecarWriter()) {
final long blobsPruningStart = System.currentTimeMillis();
final boolean blobsPruningLimitReached =
database.pruneOldestBlobSidecars(latestPrunableSlot, pruneLimit, archiveWriter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import org.apache.tuweni.bytes.Bytes32;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
Expand All @@ -45,7 +47,8 @@ public class FileSystemArchiveTest {
Bytes32.fromHexString(
"0x391610cf24e7c540192b80ddcfea77b0d3912d94e922682f3b286eee041e6f76"));

private final Spec spec = TestSpecFactory.createMinimalDeneb();
private static final Spec spec = TestSpecFactory.createMinimalDeneb();

private final Predicates predicates = new Predicates(spec.getGenesisSpecConfig());
private final SchemaDefinitionsDeneb schemaDefinitionsDeneb =
SchemaDefinitionsDeneb.required(spec.getGenesisSchemaDefinitions());
Expand All @@ -61,7 +64,7 @@ public class FileSystemArchiveTest {
@BeforeAll
static void beforeEach() throws IOException {
Path temp = Files.createTempDirectory("blobs");
dataArchive = new FileSystemArchive(temp);
dataArchive = new FileSystemArchive(spec, temp);
}

BlobSidecar createBlobSidecar() {
Expand All @@ -75,7 +78,9 @@ BlobSidecar createBlobSidecar() {

@Test
void testWriteBlobSidecar() throws IOException {
DataArchiveWriter<BlobSidecar> blobWriter = dataArchive.getBlobSidecarWriter();
assertTrue(blobWriter.archive(createBlobSidecar()));
DataArchiveWriter<List<BlobSidecar>> blobWriter = dataArchive.getBlobSidecarWriter();
ArrayList<BlobSidecar> list = new ArrayList<>();
list.add(createBlobSidecar());
assertTrue(blobWriter.archive(list));
}
}

0 comments on commit a77a5ac

Please sign in to comment.