Skip to content

Commit

Permalink
ByteBuffer support (#12)
Browse files Browse the repository at this point in the history
* ByteBuffer support

* modified copyToByteBuffer to advance buffer's position after copy

* Expands ByteBuffer support to include MemoryPool

- Adds support for Read-only ByteBuffers
  • Loading branch information
asonje authored Dec 21, 2021
1 parent c9073b2 commit f66285e
Show file tree
Hide file tree
Showing 12 changed files with 1,276 additions and 2 deletions.
6 changes: 6 additions & 0 deletions src/main/cpp/com_intel_pmem_llpl_MemoryAccessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,9 @@ JNIEXPORT jint JNICALL Java_com_intel_pmem_llpl_MemoryAccessor_nativeHasAutoFlus
{
return pmem_has_auto_flush();
}

JNIEXPORT jlong JNICALL Java_com_intel_pmem_llpl_MemoryAccessor_nativeGetDirectByteBufferAddress
(JNIEnv *env, jobject obj, jobject buf)
{
return (jlong)env->GetDirectBufferAddress(buf);
}
3 changes: 3 additions & 0 deletions src/main/cpp/com_intel_pmem_llpl_MemoryAccessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ JNIEXPORT jint JNICALL Java_com_intel_pmem_llpl_MemoryAccessor_nativeAddRangeToT

JNIEXPORT jint JNICALL Java_com_intel_pmem_llpl_MemoryAccessor_nativeHasAutoFlush
(JNIEnv *env, jobject obj);

JNIEXPORT jlong JNICALL Java_com_intel_pmem_llpl_MemoryAccessor_nativeGetDirectByteBufferAddress
(JNIEnv *env, jobject obj, jobject buf);
#ifdef __cplusplus
}
#endif
Expand Down
7 changes: 7 additions & 0 deletions src/main/cpp/com_intel_pmem_llpl_MemoryPoolImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ JNIEXPORT void JNICALL Java_com_intel_pmem_llpl_MemoryPoolImpl_nativeCopyFromByt
env->ReleaseByteArrayElements(srcArray, src, 0);
}

JNIEXPORT void JNICALL Java_com_intel_pmem_llpl_MemoryPoolImpl_nativeCopyFromByteBufferNT
(JNIEnv *env, jobject obj, jobject srcBuf, jint srcIndex, jlong dst, jint byteCount)
{
void* src = env->GetDirectBufferAddress(srcBuf);
void *addr = pmem_memcpy((void *)dst, src + srcIndex, byteCount, PMEM_F_MEM_NONTEMPORAL);
}

JNIEXPORT void JNICALL Java_com_intel_pmem_llpl_MemoryPoolImpl_nativeSetMemoryNT
(JNIEnv *env, jobject obj, jlong offset, jlong length, jbyte value)
{
Expand Down
3 changes: 3 additions & 0 deletions src/main/cpp/com_intel_pmem_llpl_MemoryPoolImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ JNIEXPORT jlong JNICALL Java_com_intel_pmem_llpl_MemoryPoolImpl_nativePoolSize
JNIEXPORT void JNICALL Java_com_intel_pmem_llpl_MemoryPoolImpl_nativeCopyFromByteArrayNT
(JNIEnv *env, jobject obj, jbyteArray srcArray, jint srcIndex, jlong dst, jint byteCount);

JNIEXPORT void JNICALL Java_com_intel_pmem_llpl_MemoryPoolImpl_nativeCopyFromByteBufferNT
(JNIEnv *env, jobject obj, jobject srcBuf, jint srcIndex, jlong dst, jint byteCount);

JNIEXPORT void JNICALL Java_com_intel_pmem_llpl_MemoryPoolImpl_nativeSetMemoryNT
(JNIEnv *env, jobject obj, jlong offset, jlong length, jbyte value);

Expand Down
74 changes: 74 additions & 0 deletions src/main/java/com/intel/pmem/llpl/MemoryAccessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import java.util.function.Supplier;
import java.util.function.Function;
import java.util.function.Consumer;
import java.nio.ByteBuffer;
import java.nio.ReadOnlyBufferException;

/**
* The base class for all memory accessor classes.
Expand Down Expand Up @@ -225,6 +227,39 @@ public void copyToArray(long srcOffset, byte[] dstArray, int dstOffset, int leng
uncheckedCopyToArray(directAddress() + metadataSize() + srcOffset, dstArray, dstOffset, length);
}

/**
* Copies {@code length} bytes from this accessor's memory, starting at {@code srcOffset}, to
* {@code dstBuf}.
* @param srcOffset the starting offset in this accessor's memory
* @param dstBuf the destination {@code ByteBuffer}
* @param length the number of bytes to copy
* @throws IndexOutOfBoundsException if copying would cause access of data outside of buffer bounds or
* outside of accessor bounds or, for compact allocations, outside of heap bounds
* @throws IllegalStateException if the accessor is not in a valid state for use
*/
public void copyToByteBuffer(long srcOffset, ByteBuffer dstBuf, int length) {
if (dstBuf.isReadOnly()) throw new ReadOnlyBufferException();
checkValid();
checkBoundsAndLength(srcOffset, length);
int size;
if ((size = dstBuf.remaining()) < length) throw new IndexOutOfBoundsException("Insufficient space remaining in destination buffer");
if (dstBuf.isDirect()) {
long dstAddress = nativeGetDirectByteBufferAddress(dstBuf);
if (dstAddress == 0) throw new IllegalArgumentException("Invalid ByteBuffer");
uncheckedCopyBlockToBlock(directAddress() + metadataSize() + srcOffset, dstAddress + dstBuf.position(), length);
dstBuf.position(dstBuf.position() + length);
}
else if (dstBuf.hasArray()) {
uncheckedCopyToArray(directAddress() + metadataSize() + srcOffset, dstBuf.array(), dstBuf.position(), length);
dstBuf.position(dstBuf.position() + length);
}
else {
byte[] tmp = new byte[length];
uncheckedCopyToArray(directAddress() + metadataSize() + srcOffset, tmp, 0, length);
dstBuf.put(tmp);
}
}

/**
* Stores the supplied {@code byte} value at {@code offset} within this accessor's memory.
* The semantics of the method depend on the implementing subclass. Persistent accessor classes
Expand Down Expand Up @@ -328,6 +363,44 @@ void copyFromMemoryBlock(AnyMemoryBlock srcBlock, long srcOffset, long dstOffset
*/
public abstract void copyFromArray(byte[] srcArray, int srcIndex, long dstOffset, int length);

/**
* Copies {@code srcBuf.remaining()} bytes from {@code srcBuf}, to this accessor's memory
* starting at {@code dstOffset}.
* The semantics of the method depend on the implementing subclass. Persistent accessor classes
* will copy memory durably and transactional accessor classes will copy memory transactionally.
* @param srcBuf the {@code ByteBuffer} from which to copy bytes
* @param dstOffset the starting offset to which bytes are to be copied
* @throws IndexOutOfBoundsException if copying would cause access of data outside of accessor
* bounds or, for compact accessors, outside of heap bounds
* @throws IllegalStateException if this accessor is not in a valid state for use
*/
public void copyFromByteBuffer(ByteBuffer srcBuf, long dstOffset) {
int size;
if ((size = srcBuf.remaining()) == 0) return;
if (srcBuf.isDirect()) {
checkValid();
checkBoundsAndLength(dstOffset, size);
long srcAddress = nativeGetDirectByteBufferAddress(srcBuf);
if (srcAddress == 0) throw new IllegalArgumentException("Invalid ByteBuffer");
Function<Range, Object> op = (Range range) -> {
range.rawCopyFromDirectByteBuffer(srcAddress + srcBuf.position(), directAddress() + metadataSize() + dstOffset, size);
return null;
};
// TODO optimize out instanceof
if (heap instanceof Heap) rawWithRange(dstOffset, size, op);
else if (heap instanceof PersistentHeap) durableWithRange(dstOffset, size, op);
else if (heap instanceof TransactionalHeap) transactionalWithRange(dstOffset, size, op);
}
else if (srcBuf.hasArray()) {
copyFromArray(srcBuf.array(), srcBuf.position(), dstOffset, srcBuf.remaining());
}
else {
byte[] tmp = new byte[size];
srcBuf.get(tmp);
copyFromArray(tmp, 0, dstOffset, size);
}
}

/**
* Sets {@code length} bytes in this accessor's memory, starting at {@code offset}, to the supplied {@code byte}
* value.
Expand Down Expand Up @@ -641,4 +714,5 @@ static String outOfBoundsMessage(long offset, long length)
private native static int nativeHasAutoFlush();
native static int nativeAddToTransactionNoCheck(long address, long size);
native static int nativeAddRangeToTransaction(long poolHandle, long address, long size);
static native long nativeGetDirectByteBufferAddress(ByteBuffer buf);
}
31 changes: 31 additions & 0 deletions src/main/java/com/intel/pmem/llpl/MemoryPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,26 @@ public static boolean exists(String path) {
*/
public void copyFromByteArray(byte[] srcArray, int srcIndex, long dstOffset, int byteCount);

/**
* Copies {@code srcBuf.remaining()} bytes from {@code srcBuf}, starting at {@code srcBuf.position()}
* to this pool's memory starting at {@code dstOffset}.
* @param srcBuf the {@code ByteBuffer} from which to copy bytes
* @param dstOffset the starting offset to which bytes are to be copied
* @throws IndexOutOfBoundsException if copying would cause access of data outside of buffer bounds or
* outside of pool bounds
*/
public void copyFromByteBuffer(ByteBuffer srcBuf, long dstOffset);

/**
* Copies {@code srcBuf.remaining()} bytes from {@code srcBuf} in a non-temporal way, starting at
* {@code srcBuf.position()} to this pool's memory starting at {@code dstOffset}.
* @param srcBuf the {@code ByteBuffer} from which to copy bytes
* @param dstOffset the starting offset to which bytes are to be copied
* @throws IndexOutOfBoundsException if copying would cause access of data outside of buffer bounds or
* outside of pool bounds
*/
public void copyFromByteBufferNT(ByteBuffer srcBuf, long dstOffset);

/**
* Copies {@code byteCount} bytes from this pool's memory, starting at {@code srcOffset}, to the
* {@code dstArray} byte array starting at array index {@code dstIndex}.
Expand All @@ -203,6 +223,17 @@ public static boolean exists(String path) {
*/
public void copyToByteArray(long srcOffset, byte[] dstArray, int dstIndex, int byteCount);

/**
* Copies {@code byteCount} bytes from this pool's memory, starting at {@code srcOffset}, to the
* {@code dstBuf}.
* @param srcOffset the starting offset in this pool's memory
* @param dstBuf the destination {@code ByteBuffer}
* @param byteCount the number of bytes to copy
* @throws IndexOutOfBoundsException if copying would cause access of data outside of buffer bounds or
* outside of pool bounds
*/
public void copyToByteBuffer(long srcOffset, ByteBuffer dstBuf, int byteCount);

/**
* Sets {@code byteCount} bytes in this pool's memory, starting at {@code offset}, to the specified {@code value}.
* @param value the value to set
Expand Down
63 changes: 63 additions & 0 deletions src/main/java/com/intel/pmem/llpl/MemoryPoolImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import java.io.File;
import java.nio.ByteBuffer;
import java.nio.ReadOnlyBufferException;
import sun.misc.Unsafe;

class MemoryPoolImpl implements MemoryPool {
Expand Down Expand Up @@ -173,6 +174,67 @@ public void copyToByteArray(long srcOffset, byte[] dstArray, int dstIndex, int b
UNSAFE.copyMemory(null, dataAddress(srcOffset), dstArray, dstAddress, byteCount);
}

@Override
public void copyFromByteBuffer(ByteBuffer srcBuf, long dstOffset) {
int size;
if ((size = srcBuf.remaining()) == 0) return;
if (srcBuf.isDirect()) {
checkBounds(dstOffset, size);
long srcAddress = MemoryAccessor.nativeGetDirectByteBufferAddress(srcBuf);
if (srcAddress == 0) throw new IllegalArgumentException("Invalid ByteBuffer");
UNSAFE.copyMemory(srcAddress + srcBuf.position(), dataAddress(dstOffset), size);
}
else if (srcBuf.hasArray()) {
copyFromByteArray(srcBuf.array(), srcBuf.position(), dstOffset, srcBuf.remaining());
}
else {
byte[] tmp = new byte[size];
srcBuf.get(tmp);
copyFromByteArray(tmp, 0, dstOffset, size);
}
}

@Override
public void copyFromByteBufferNT(ByteBuffer srcBuf, long dstOffset) {
if (srcBuf.isDirect()) {
int size;
if ((size = srcBuf.remaining()) == 0) return;
checkBounds(dstOffset, size);
nativeCopyFromByteBufferNT(srcBuf, srcBuf.position(), dataAddress(dstOffset), size);
}
else if (srcBuf.hasArray()) {
copyFromByteArrayNT(srcBuf.array(), srcBuf.position(), dstOffset, srcBuf.remaining());
}
else {
byte[] tmp = new byte[srcBuf.remaining()];
srcBuf.get(tmp);
copyFromByteArray(tmp, 0, dstOffset, tmp.length);
}
}

@Override
public void copyToByteBuffer(long srcOffset, ByteBuffer dstBuf, int byteCount) {
if (dstBuf.isReadOnly()) throw new ReadOnlyBufferException();
int size;
if ((size = dstBuf.remaining()) < byteCount) throw new IndexOutOfBoundsException("Insufficient space remaining in destination buffer");
if (dstBuf.isDirect()) {
long dstAddress = MemoryAccessor.nativeGetDirectByteBufferAddress(dstBuf);
if (dstAddress == 0) throw new IllegalArgumentException("Invalid ByteBuffer");
checkBounds(srcOffset, byteCount);
UNSAFE.copyMemory(dataAddress(srcOffset), dstAddress + dstBuf.position(), size);
dstBuf.position(dstBuf.position() + byteCount);
}
else if (dstBuf.hasArray()) {
copyToByteArray(srcOffset, dstBuf.array(), dstBuf.position(), byteCount);
dstBuf.position(dstBuf.position() + byteCount);
}
else {
byte[] tmp = new byte[byteCount];
copyToByteArray(srcOffset, tmp, 0, byteCount);
dstBuf.put(tmp);
}
}

@Override
public void setMemory(byte value, long offset, long byteCount) {
checkBounds(offset, byteCount);
Expand Down Expand Up @@ -239,5 +301,6 @@ static String indexOutOfBoundsMessage(long index, long length)
private static native long nativePoolSize(String path);
private static native void nativeCopyMemoryNT(long srcOffset, long dstOffset, long byteCount);
private static native void nativeCopyFromByteArrayNT(byte[] srcArray, int srcIndex, long dst, int byteCount);
private static native void nativeCopyFromByteBufferNT(ByteBuffer srcBuf, int srcIndex, long dst, int byteCount);
private static native void nativeSetMemoryNT(long offset, long length, byte value);
}
29 changes: 29 additions & 0 deletions src/main/java/com/intel/pmem/llpl/Range.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

package com.intel.pmem.llpl;

import java.nio.ByteBuffer;

/**
* Implements methods suitable for writing within a contiguous range of locations in an associated block of memory. An instance of this class
* is provided as an argument to user-supplied functions associated with ranged operations, such as
Expand Down Expand Up @@ -190,6 +192,33 @@ public void setMemory(byte value, long offset, long length) {
MemoryAccessor.uncheckedSetMemory(accessor.directAddress() + accessor.metadataSize() + offset, value, length);
}

/**
* Copies {@code srcBuf.remaining()} bytes from {@code srcBuf}, to the block of memory associated
* with this range, starting at {@code dstOffset}.
* @param srcBuf the {@code ByteBuffer} from which to copy bytes
* @param dstOffset the starting offset to which bytes are to be copied
* @throws IndexOutOfBoundsException if copying would cause access of data outside of this range's bounds
* @throws IllegalStateException if this range is not in a valid state for use
*/
public void copyFromByteBuffer(ByteBuffer srcBuf, long dstOffset) {
int size;
if ((size = srcBuf.remaining()) == 0) return;
if (srcBuf.isDirect()) {
checkValid();
checkBoundsAndLength(dstOffset, size);
long srcAddress = MemoryAccessor.nativeGetDirectByteBufferAddress(srcBuf);
if (srcAddress <= 0) throw new IllegalArgumentException("Invalid ByteBuffer");
MemoryAccessor.uncheckedCopyBlockToBlock(srcAddress + srcBuf.position(), accessor.directAddress() + accessor.metadataSize() + dstOffset, size);
}
else {
copyFromArray(srcBuf.array(), srcBuf.position(), dstOffset, srcBuf.remaining());
}
}

void rawCopyFromDirectByteBuffer(long srcAddress, long dstOffset, long length) {
MemoryAccessor.uncheckedCopyBlockToBlock(srcAddress, dstOffset, length);
}

void flush() {
checkValid();
accessor.internalFlush(startOffset, rangeLength);
Expand Down
Loading

0 comments on commit f66285e

Please sign in to comment.