From e980a6785e35665868b74660c627c5ef256d5260 Mon Sep 17 00:00:00 2001 From: Ben Alex Date: Tue, 17 May 2016 21:06:52 +1000 Subject: [PATCH] Replace DirectBuffer with Agrona DirectBuffer --- lmdbjni-android/pom.xml | 19 + lmdbjni-linux64/pom.xml | 19 + lmdbjni-osx64/pom.xml | 19 + lmdbjni-win64/pom.xml | 19 + lmdbjni/pom.xml | 69 ++ .../org/fusesource/lmdbjni/BufferCursor.java | 57 +- .../java/org/fusesource/lmdbjni/Buffers.java | 60 ++ .../java/org/fusesource/lmdbjni/Cursor.java | 14 +- .../java/org/fusesource/lmdbjni/Database.java | 17 +- .../org/fusesource/lmdbjni/DirectBuffer.java | 678 ------------------ .../main/java/org/fusesource/lmdbjni/Env.java | 2 + .../org/fusesource/lmdbjni/Transaction.java | 3 +- .../java/org/fusesource/lmdbjni/Util.java | 36 + .../fusesource/lmdbjni/BufferCursorTest.java | 32 +- .../fusesource/lmdbjni/ComparatorTest.java | 2 + .../org/fusesource/lmdbjni/DatabaseTest.java | 3 +- .../java/org/fusesource/lmdbjni/EnvTest.java | 2 +- .../java/org/fusesource/lmdbjni/SbeTest.java | 315 ++++++++ .../org/fusesource/lmdbjni/ZeroCopyTest.java | 38 +- lmdbjni/src/test/resources/car.xml | 88 +++ 20 files changed, 732 insertions(+), 760 deletions(-) create mode 100644 lmdbjni/src/main/java/org/fusesource/lmdbjni/Buffers.java delete mode 100644 lmdbjni/src/main/java/org/fusesource/lmdbjni/DirectBuffer.java create mode 100644 lmdbjni/src/test/java/org/fusesource/lmdbjni/SbeTest.java create mode 100644 lmdbjni/src/test/resources/car.xml diff --git a/lmdbjni-android/pom.xml b/lmdbjni-android/pom.xml index 863a54b..bdce06f 100755 --- a/lmdbjni-android/pom.xml +++ b/lmdbjni-android/pom.xml @@ -109,6 +109,25 @@ limitations under the License. + + org.codehaus.mojo + build-helper-maven-plugin + 1.1 + + + add-test-source + generate-sources + + add-test-source + + + + ${basedir}/../lmdbjni/target/generated-sources + + + + + diff --git a/lmdbjni-linux64/pom.xml b/lmdbjni-linux64/pom.xml index 2be5f6b..a383061 100755 --- a/lmdbjni-linux64/pom.xml +++ b/lmdbjni-linux64/pom.xml @@ -84,6 +84,25 @@ limitations under the License. + + org.codehaus.mojo + build-helper-maven-plugin + 1.1 + + + add-test-source + generate-sources + + add-test-source + + + + ${basedir}/../lmdbjni/target/generated-sources + + + + + diff --git a/lmdbjni-osx64/pom.xml b/lmdbjni-osx64/pom.xml index 525acbf..e285a77 100755 --- a/lmdbjni-osx64/pom.xml +++ b/lmdbjni-osx64/pom.xml @@ -80,6 +80,25 @@ limitations under the License. + + org.codehaus.mojo + build-helper-maven-plugin + 1.1 + + + add-test-source + generate-sources + + add-test-source + + + + ${basedir}/../lmdbjni/target/generated-sources + + + + + diff --git a/lmdbjni-win64/pom.xml b/lmdbjni-win64/pom.xml index a079b00..88321ec 100644 --- a/lmdbjni-win64/pom.xml +++ b/lmdbjni-win64/pom.xml @@ -62,6 +62,25 @@ ${basedir}/../lmdbjni/target/generated-sources/hawtjni/native-package + + org.codehaus.mojo + build-helper-maven-plugin + 1.1 + + + add-test-source + generate-sources + + add-test-source + + + + ${basedir}/../lmdbjni/target/generated-sources + + + + + diff --git a/lmdbjni/pom.xml b/lmdbjni/pom.xml index 285bd82..98b9b06 100755 --- a/lmdbjni/pom.xml +++ b/lmdbjni/pom.xml @@ -45,6 +45,11 @@ hawtjni-runtime ${hawtjni-version} + + org.agrona + Agrona + 0.5 + com.google.guava guava @@ -116,6 +121,11 @@ maven-shade-plugin 2.4.1 + + + org.agrona:Agrona:*:* + + true true @@ -128,6 +138,65 @@ + + org.codehaus.mojo + build-helper-maven-plugin + 1.1 + + + add-test-source + generate-sources + + add-test-source + + + + ${project.build.directory}/generated-sources + + + + + + + org.codehaus.mojo + exec-maven-plugin + 1.3.2 + + + generate-sources + + java + + + + + false + true + + uk.co.real-logic + sbe-all + + uk.co.real_logic.sbe.SbeTool + test + + + sbe.output.dir + ${project.build.directory}/generated-sources/java + + + + ${project.build.testResources[0].directory}/car.xml + + ${project.build.directory}/generated-sources/java + + + + uk.co.real-logic + sbe-all + 1.4.0-RC4 + + + diff --git a/lmdbjni/src/main/java/org/fusesource/lmdbjni/BufferCursor.java b/lmdbjni/src/main/java/org/fusesource/lmdbjni/BufferCursor.java index 067edf5..d38d0d8 100644 --- a/lmdbjni/src/main/java/org/fusesource/lmdbjni/BufferCursor.java +++ b/lmdbjni/src/main/java/org/fusesource/lmdbjni/BufferCursor.java @@ -2,6 +2,7 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; +import org.agrona.MutableDirectBuffer; import static org.fusesource.lmdbjni.JNI.mdb_strerror; @@ -120,15 +121,15 @@ public class BufferCursor implements AutoCloseable { private final ByteBuffer keyByteBuffer; private ByteBuffer valueByteBuffer; private final boolean isReadOnly; - private DirectBuffer key; - private DirectBuffer value; + private MutableDirectBuffer key; + private MutableDirectBuffer value; private boolean keyDatbaseMemoryLocation = false; private boolean valDatbaseMemoryLocation = false; private int keyWriteIndex = 0; private int valWriteIndex = 0; private boolean validPosition = false; - BufferCursor(Cursor cursor, DirectBuffer key, DirectBuffer value) { + BufferCursor(Cursor cursor, MutableDirectBuffer key, MutableDirectBuffer value) { this.cursor = cursor; this.isReadOnly = cursor.isReadOnly(); if (key.byteBuffer() == null) { @@ -150,7 +151,7 @@ public class BufferCursor implements AutoCloseable { } BufferCursor(Cursor cursor, int maxValueSize) { - this(cursor, new DirectBuffer(), new DirectBuffer(ByteBuffer.allocateDirect(maxValueSize))); + this(cursor, Buffers.buffer(), Buffers.buffer(maxValueSize)); } /** @@ -333,10 +334,8 @@ public void close() { * previously existing key. */ public boolean put() { - DirectBuffer k = (keyWriteIndex != 0) ? - new DirectBuffer(key.addressOffset(), keyWriteIndex) : key; - DirectBuffer v = (valWriteIndex != 0) ? - new DirectBuffer(value.addressOffset(), valWriteIndex) : value; + MutableDirectBuffer k = (keyWriteIndex != 0) ? Buffers.bufferSlice(key, keyWriteIndex) : key; + MutableDirectBuffer v = (valWriteIndex != 0) ? Buffers.bufferSlice(value, valWriteIndex) : value; keyWriteIndex = 0; valWriteIndex = 0; int rc = cursor.put(k, v, Constants.NOOVERWRITE); @@ -355,10 +354,8 @@ public boolean put() { * previously existing key. Also used for adding duplicates. */ public boolean overwrite() { - DirectBuffer k = (keyWriteIndex != 0) ? - new DirectBuffer(key.addressOffset(), keyWriteIndex) : key; - DirectBuffer v = (valWriteIndex != 0) ? - new DirectBuffer(value.addressOffset(), valWriteIndex) : value; + MutableDirectBuffer k = (keyWriteIndex != 0) ? Buffers.bufferSlice(key, keyWriteIndex) : key; + MutableDirectBuffer v = (valWriteIndex != 0) ? Buffers.bufferSlice(value, valWriteIndex) : value; keyWriteIndex = 0; valWriteIndex = 0; int rc = cursor.put(k, v, 0); @@ -380,10 +377,8 @@ public boolean overwrite() { * data corruption. */ public void append() { - DirectBuffer k = (keyWriteIndex != 0) ? - new DirectBuffer(key.addressOffset(), keyWriteIndex) : key; - DirectBuffer v = (valWriteIndex != 0) ? - new DirectBuffer(value.addressOffset(), valWriteIndex) : value; + MutableDirectBuffer k = (keyWriteIndex != 0) ? Buffers.bufferSlice(key, keyWriteIndex) : key; + MutableDirectBuffer v = (valWriteIndex != 0) ? Buffers.bufferSlice(value, valWriteIndex) : value; keyWriteIndex = 0; valWriteIndex = 0; int rc = cursor.put(k, v, Constants.APPEND); @@ -500,7 +495,7 @@ public BufferCursor keyWriteDouble(double data) { */ public BufferCursor keyWriteUtf8(ByteString data) { setSafeKeyMemoryLocation(); - this.key.putString(keyWriteIndex, data); + Util.putString(this.key, keyWriteIndex, data); keyWriteIndex += data.size() + 1; return this; } @@ -515,7 +510,7 @@ public BufferCursor keyWriteUtf8(ByteString data) { public BufferCursor keyWriteUtf8(String data) { setSafeKeyMemoryLocation(); ByteString bytes = new ByteString(data); - this.key.putString(keyWriteIndex, bytes); + Util.putString(this.key, keyWriteIndex, bytes); keyWriteIndex += bytes.size() + 1; return this; } @@ -558,7 +553,7 @@ public BufferCursor keyWriteBytes(byte[] data, int offset, int length) { * @param capacity capacity * @return this */ - public BufferCursor keyWrite(DirectBuffer buffer, int capacity) { + public BufferCursor keyWrite(MutableDirectBuffer buffer, int capacity) { setSafeKeyMemoryLocation(); this.key.putBytes(keyWriteIndex, buffer, 0, capacity); keyWriteIndex += capacity; @@ -568,7 +563,7 @@ public BufferCursor keyWrite(DirectBuffer buffer, int capacity) { /** * @see org.fusesource.lmdbjni.BufferCursor#keyWrite(DirectBuffer, int) */ - public BufferCursor keyWrite(DirectBuffer buffer) { + public BufferCursor keyWrite(MutableDirectBuffer buffer) { keyWrite(buffer, buffer.capacity()); return this; } @@ -672,7 +667,7 @@ public double keyDouble(int pos) { */ public ByteString keyUtf8(int pos) { checkForValidPosition(); - return this.key.getString(pos); + return Util.getString(key, pos); } /** @@ -703,14 +698,14 @@ public byte[] keyBytes() { * * @return underlying buffer */ - public DirectBuffer keyBuffer() { + public MutableDirectBuffer keyBuffer() { return key; } /** * @return the key direct buffer at current position. */ - public DirectBuffer keyDirectBuffer() { + public MutableDirectBuffer keyDirectBuffer() { checkForValidPosition(); return key; } @@ -855,7 +850,7 @@ public BufferCursor valWriteUtf8(String data) { setSafeValMemoryLocation(); ByteString bytes = new ByteString(data); ensureValueWritableBytes(bytes.size() + 1); - this.value.putString(valWriteIndex, bytes); + Util.putString(this.value, valWriteIndex, bytes); valWriteIndex += bytes.size() + 1; return this; } @@ -873,7 +868,7 @@ public BufferCursor valWriteUtf8(ByteString data) { } setSafeValMemoryLocation(); ensureValueWritableBytes(data.size() + 1); - this.value.putString(valWriteIndex, data); + Util.putString(this.value, valWriteIndex, data); valWriteIndex += data.size() + 1; return this; } @@ -927,7 +922,7 @@ public BufferCursor valWriteBytes(byte[] data, int offset, int length) { * @param length how many bytes to write * @return this */ - public BufferCursor valWrite(DirectBuffer buffer, int srcIndex, int length) { + public BufferCursor valWrite(MutableDirectBuffer buffer, int srcIndex, int length) { if (isReadOnly) { throw new LMDBException("Read only transaction", LMDBException.EACCES); } @@ -941,14 +936,14 @@ public BufferCursor valWrite(DirectBuffer buffer, int srcIndex, int length) { /** * @see org.fusesource.lmdbjni.BufferCursor#valWrite(DirectBuffer, int, int) */ - public BufferCursor valWrite(DirectBuffer buffer, int length) { + public BufferCursor valWrite(MutableDirectBuffer buffer, int length) { return valWrite(buffer, 0, length); } /** * @see org.fusesource.lmdbjni.BufferCursor#valWrite(DirectBuffer, int) */ - public BufferCursor valWrite(DirectBuffer buffer) { + public BufferCursor valWrite(MutableDirectBuffer buffer) { valWrite(buffer, 0, buffer.capacity()); return this; } @@ -1056,7 +1051,7 @@ public byte[] valBytes() { * * @return underlying buffer */ - public DirectBuffer valBuffer() { + public MutableDirectBuffer valBuffer() { return value; } @@ -1091,13 +1086,13 @@ public double valDouble(int pos) { */ public ByteString valUtf8(int pos) { checkForValidPosition(); - return this.value.getString(pos); + return Util.getString(this.value, pos); } /** * @return the direct buffer at the current position. */ - public DirectBuffer valDirectBuffer() { + public MutableDirectBuffer valDirectBuffer() { checkForValidPosition(); return this.value; } diff --git a/lmdbjni/src/main/java/org/fusesource/lmdbjni/Buffers.java b/lmdbjni/src/main/java/org/fusesource/lmdbjni/Buffers.java new file mode 100644 index 0000000..f85d0b6 --- /dev/null +++ b/lmdbjni/src/main/java/org/fusesource/lmdbjni/Buffers.java @@ -0,0 +1,60 @@ +package org.fusesource.lmdbjni; + +import java.nio.ByteBuffer; +import org.agrona.DirectBuffer; +import org.agrona.MutableDirectBuffer; +import org.agrona.concurrent.UnsafeBuffer; + +/** + * Buffer creation utility methods. + */ +final class Buffers { + + /** + * Private constructor to prevent instantiation of utility class. + */ + private Buffers() { + } + + /** + * Obtain a new buffer of the specified capacity. + *

+ * Do not use on Android. + * + * @param capacity the number of bytes the buffer should store + * @return a buffer (never null) + */ + public static MutableDirectBuffer buffer(int capacity) { + if (capacity == 0) { + return new UnsafeBuffer(new byte[0]); + } + return new UnsafeBuffer(ByteBuffer.allocateDirect(capacity)); + } + + /** + * Obtain a new buffer large enough for an LMDB maximum sized key. The current + * maximum is provided by {@link Env#MAX_KEY_SIZE}. + *

+ * Do not use on Android. + * + * @return a buffer suitable for storing a key (never null) + */ + public static MutableDirectBuffer buffer() { + return buffer(Env.MAX_KEY_SIZE); + } + + /** + * Obtain a new buffer slice that shares the same memory as the source buffer. + *

+ * Do not use on Android. + * + * @param source the source buffer + * @param length the number of bytes the new slice should include + * @return a buffer of the request length, starting at byte 0 of the source + * (never null) + */ + public static MutableDirectBuffer bufferSlice(DirectBuffer source, int length) { + return new UnsafeBuffer(source.addressOffset(), length); + } + +} diff --git a/lmdbjni/src/main/java/org/fusesource/lmdbjni/Cursor.java b/lmdbjni/src/main/java/org/fusesource/lmdbjni/Cursor.java index 5f08f6e..97e8cd3 100644 --- a/lmdbjni/src/main/java/org/fusesource/lmdbjni/Cursor.java +++ b/lmdbjni/src/main/java/org/fusesource/lmdbjni/Cursor.java @@ -20,6 +20,8 @@ import java.nio.ByteBuffer; +import org.agrona.DirectBuffer; +import org.agrona.MutableDirectBuffer; import static org.fusesource.lmdbjni.JNI.*; import static org.fusesource.lmdbjni.Util.checkArgNotNull; @@ -104,7 +106,7 @@ public Entry get(GetOp op) { */ public int position(DirectBuffer key, DirectBuffer value, GetOp op) { if (buffer == null) { - buffer = new DirectBuffer(ByteBuffer.allocateDirect(Unsafe.ADDRESS_SIZE * 4)); + buffer = Buffers.buffer(Unsafe.ADDRESS_SIZE * 4); bufferAddress = buffer.addressOffset(); } checkArgNotNull(op, "op"); @@ -126,7 +128,7 @@ public int seekPosition(DirectBuffer key, DirectBuffer value, SeekOp op) { checkArgNotNull(value, "value"); checkArgNotNull(op, "op"); if (buffer == null) { - buffer = new DirectBuffer(ByteBuffer.allocateDirect(Unsafe.ADDRESS_SIZE * 4)); + buffer = Buffers.buffer(Unsafe.ADDRESS_SIZE * 4); bufferAddress = buffer.addressOffset(); } Unsafe.putLong(bufferAddress, 0, key.capacity()); @@ -248,7 +250,7 @@ public int put(DirectBuffer key, DirectBuffer value, int flags) { checkArgNotNull(key, "key"); checkArgNotNull(value, "value"); if (buffer == null) { - buffer = new DirectBuffer(ByteBuffer.allocateDirect(Unsafe.ADDRESS_SIZE * 4)); + buffer = Buffers.buffer(Unsafe.ADDRESS_SIZE * 4); bufferAddress = buffer.addressOffset(); } Unsafe.putLong(bufferAddress, 0, key.capacity()); @@ -273,13 +275,13 @@ private byte[] put(Value keySlice, Value valueSlice, int flags) { * * @return a pointer to the reserved space. */ - public DirectBuffer reserve(DirectBuffer key, int size) { + public MutableDirectBuffer reserve(DirectBuffer key, int size) { checkArgNotNull(key, "key"); if (key.byteArray() != null || !key.byteBuffer().isDirect()) { throw new IllegalArgumentException("Key buffer is not direct."); } if (buffer == null) { - buffer = new DirectBuffer(ByteBuffer.allocateDirect(Unsafe.ADDRESS_SIZE * 4)); + buffer = Buffers.buffer(Unsafe.ADDRESS_SIZE * 4); bufferAddress = buffer.addressOffset(); } Unsafe.putLong(bufferAddress, 0, key.capacity()); @@ -289,7 +291,7 @@ public DirectBuffer reserve(DirectBuffer key, int size) { checkErrorCode(rc); int valSize = (int) Unsafe.getLong(bufferAddress, 2); long valAddress = Unsafe.getAddress(bufferAddress, 3); - DirectBuffer empty = new DirectBuffer(0, 0); + MutableDirectBuffer empty = Buffers.buffer(0); empty.wrap(valAddress, valSize); return empty; } diff --git a/lmdbjni/src/main/java/org/fusesource/lmdbjni/Database.java b/lmdbjni/src/main/java/org/fusesource/lmdbjni/Database.java index a2ca2d5..3fef60f 100644 --- a/lmdbjni/src/main/java/org/fusesource/lmdbjni/Database.java +++ b/lmdbjni/src/main/java/org/fusesource/lmdbjni/Database.java @@ -22,8 +22,9 @@ import org.fusesource.hawtjni.runtime.Callback; import org.fusesource.lmdbjni.EntryIterator.IteratorType; -import java.nio.ByteBuffer; import java.util.Comparator; +import org.agrona.DirectBuffer; +import org.agrona.MutableDirectBuffer; import static org.fusesource.lmdbjni.JNI.*; import static org.fusesource.lmdbjni.Util.checkArgNotNull; @@ -309,7 +310,7 @@ public BufferCursor bufferCursor(Transaction tx, int maxValueSize) { * @param value A DirectBuffer must be backed by a direct ByteBuffer. * @return a closable cursor handle. */ - public BufferCursor bufferCursor(Transaction tx, DirectBuffer key, DirectBuffer value) { + public BufferCursor bufferCursor(Transaction tx, MutableDirectBuffer key, MutableDirectBuffer value) { Cursor cursor = openCursor(tx); return new BufferCursor(cursor, key, value); } @@ -362,7 +363,7 @@ public int put(Transaction tx, DirectBuffer key, DirectBuffer value, int flags) * * @return a pointer to the reserved space. */ - public DirectBuffer reserve(Transaction tx, DirectBuffer key, int size) { + public MutableDirectBuffer reserve(Transaction tx, DirectBuffer key, int size) { checkArgNotNull(key, "key"); long address = tx.getBufferAddress(); Unsafe.putLong(address, 0, key.capacity()); @@ -373,7 +374,7 @@ public DirectBuffer reserve(Transaction tx, DirectBuffer key, int size) { checkErrorCode(rc); int valSize = (int) Unsafe.getLong(address, 2); long valAddress = Unsafe.getAddress(address, 3); - DirectBuffer empty = new DirectBuffer(0, 0); + MutableDirectBuffer empty = Buffers.buffer(0); empty.wrap(valAddress, valSize); return empty; } @@ -694,13 +695,13 @@ public ByteArrayComparator(Comparator comparator) { public long compare(long ptr1, long ptr2) { int size = (int) Unsafe.getLong(ptr1, 0); long address = Unsafe.getAddress(ptr1, 1); - DirectBuffer key1 = new DirectBuffer(); + DirectBuffer key1 = Buffers.buffer(); key1.wrap(address, size); byte[] key1Bytes = new byte[size]; key1.getBytes(0, key1Bytes); size = (int) Unsafe.getLong(ptr2, 0); address = Unsafe.getAddress(ptr2, 1); - DirectBuffer key2 = new DirectBuffer(); + DirectBuffer key2 = Buffers.buffer(); key2.wrap(address, size); byte[] key2Bytes = new byte[size]; key2.getBytes(0, key2Bytes); @@ -718,11 +719,11 @@ public DirectBufferComparator(Comparator comparator) { public long compare(long ptr1, long ptr2) { int size = (int) Unsafe.getLong(ptr1, 0); long address = Unsafe.getAddress(ptr1, 1); - DirectBuffer key1 = new DirectBuffer(); + DirectBuffer key1 = Buffers.buffer(); key1.wrap(address, size); size = (int) Unsafe.getLong(ptr2, 0); address = Unsafe.getAddress(ptr2, 1); - DirectBuffer key2 = new DirectBuffer(); + DirectBuffer key2 = Buffers.buffer(); key2.wrap(address, size); return comparator.compare(key1, key2); } diff --git a/lmdbjni/src/main/java/org/fusesource/lmdbjni/DirectBuffer.java b/lmdbjni/src/main/java/org/fusesource/lmdbjni/DirectBuffer.java deleted file mode 100644 index cfdaa3b..0000000 --- a/lmdbjni/src/main/java/org/fusesource/lmdbjni/DirectBuffer.java +++ /dev/null @@ -1,678 +0,0 @@ -/* - * Copyright 2014 Real Logic Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.fusesource.lmdbjni; - -import sun.misc.Unsafe; - -import java.lang.reflect.Field; -import java.nio.ByteBuffer; -import java.nio.ByteOrder; -import java.nio.charset.StandardCharsets; -import java.security.AccessController; -import java.security.PrivilegedExceptionAction; - -/** - * Supports regular, byte ordered, and atomic (memory ordered) access to an underlying buffer. - * The buffer can be a byte[] or one of the various {@link java.nio.ByteBuffer} implementations. - */ -public class DirectBuffer { - /** Size of a byte in bytes */ - public static final int SIZE_OF_BYTE = 1; - - /** Size of a boolean in bytes */ - public static final int SIZE_OF_BOOLEAN = 1; - - /** Size of a char in bytes */ - public static final int SIZE_OF_CHAR = 2; - - /** Size of a short in bytes */ - public static final int SIZE_OF_SHORT = 2; - - /** Size of an int in bytes */ - public static final int SIZE_OF_INT = 4; - - /** Size of a a float in bytes */ - public static final int SIZE_OF_FLOAT = 4; - - /** Size of a long in bytes */ - public static final int SIZE_OF_LONG = 8; - - /** Size of a double in bytes */ - public static final int SIZE_OF_DOUBLE = 8; - - private static final Unsafe UNSAFE; - - static { - try { - final PrivilegedExceptionAction action = new PrivilegedExceptionAction() { - public Unsafe run() throws Exception { - final Field field = Unsafe.class.getDeclaredField("theUnsafe"); - field.setAccessible(true); - return (Unsafe) field.get(null); - } - }; - - UNSAFE = AccessController.doPrivileged(action); - } catch (final Exception ex) { - throw new RuntimeException(ex); - } - } - - /** - * Get the instance of {@link sun.misc.Unsafe}. - * - * @return the instance of Unsafe - */ - public static Unsafe getUnsafe() { - return UNSAFE; - } - - public static final String DISABLE_BOUNDS_CHECKS_PROP_NAME = "directbuffer.disable.bounds.checks"; - public static final boolean SHOULD_BOUNDS_CHECK = !Boolean.getBoolean(DISABLE_BOUNDS_CHECKS_PROP_NAME); - - private static final byte[] NULL_BYTES = "null".getBytes(StandardCharsets.UTF_8); - private static final ByteOrder NATIVE_BYTE_ORDER = ByteOrder.nativeOrder(); - private static final long ARRAY_BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class); - - private byte[] byteArray; - private ByteBuffer byteBuffer; - private long addressOffset; - - private int capacity; - - /** - * Attach a view to a direct {@link java.nio.ByteBuffer}. - */ - public DirectBuffer() { - wrap(ByteBuffer.allocateDirect(511)); - } - - /** - * Attach a view to a byte[] for providing direct access. - * - * @param buffer to which the view is attached. - */ - public DirectBuffer(final byte[] buffer) { - wrap(buffer); - } - - /** - * Attach a view to a {@link java.nio.ByteBuffer} for providing direct access, the {@link java.nio.ByteBuffer} can be - * heap based or direct. - * - * @param buffer to which the view is attached. - */ - public DirectBuffer(final ByteBuffer buffer) { - wrap(buffer); - } - - /** - * Attach a view to an off-heap memory region by address. - * - * @param address where the memory begins off-heap - * @param capacity of the buffer from the given address - */ - public DirectBuffer(final long address, final int capacity) { - wrap(address, capacity); - } - - /** - * Attach a view to an existing {@link DirectBuffer} - * - * @param buffer to which the view is attached. - */ - public DirectBuffer(final DirectBuffer buffer) { - wrap(buffer); - } - - public void wrap(final byte[] buffer) { - addressOffset = ARRAY_BASE_OFFSET; - capacity = buffer.length; - byteArray = buffer; - byteBuffer = null; - } - - public void wrap(final ByteBuffer buffer) { - byteBuffer = buffer; - - if (buffer.hasArray()) { - byteArray = buffer.array(); - addressOffset = ARRAY_BASE_OFFSET + buffer.arrayOffset(); - } else { - byteArray = null; - addressOffset = ((sun.nio.ch.DirectBuffer) buffer).address(); - } - - capacity = buffer.capacity(); - } - - public void wrap(final long address, final int capacity) { - addressOffset = address; - this.capacity = capacity; - byteArray = null; - byteBuffer = null; - } - - public void wrap(final DirectBuffer buffer) { - addressOffset = buffer.addressOffset(); - capacity = buffer.capacity(); - byteArray = buffer.byteArray(); - byteBuffer = buffer.byteBuffer(); - } - - public long addressOffset() { - return addressOffset; - } - - public byte[] byteArray() { - return byteArray; - } - - public ByteBuffer byteBuffer() { - return byteBuffer; - } - - public void setMemory(final int index, final int length, final byte value) { - boundsCheck(index, length); - - UNSAFE.setMemory(byteArray, addressOffset + index, length, value); - } - - public int capacity() { - return capacity; - } - - public void checkLimit(final int limit) { - if (limit > capacity) { - final String msg = String.format("limit=%d is beyond capacity=%d", limit, capacity); - throw new IndexOutOfBoundsException(msg); - } - } - - public ByteBuffer duplicateByteBuffer() { - if (null == byteBuffer) { - return ByteBuffer.wrap(byteArray); - } else { - final ByteBuffer duplicate = byteBuffer.duplicate(); - duplicate.clear(); - - return duplicate; - } - } - - /////////////////////////////////////////////////////////////////////////// - - public long getLong(final int index, final ByteOrder byteOrder) { - boundsCheck(index, SIZE_OF_LONG); - - long bits = UNSAFE.getLong(byteArray, addressOffset + index); - if (NATIVE_BYTE_ORDER != byteOrder) { - bits = Long.reverseBytes(bits); - } - - return bits; - } - - public void putLong(final int index, final long value, final ByteOrder byteOrder) { - boundsCheck(index, SIZE_OF_LONG); - - long bits = value; - if (NATIVE_BYTE_ORDER != byteOrder) { - bits = Long.reverseBytes(bits); - } - - UNSAFE.putLong(byteArray, addressOffset + index, bits); - } - - public long getLong(final int index) { - boundsCheck(index, SIZE_OF_LONG); - - return UNSAFE.getLong(byteArray, addressOffset + index); - } - - public void putLong(final int index, final long value) { - boundsCheck(index, SIZE_OF_LONG); - - UNSAFE.putLong(byteArray, addressOffset + index, value); - } - - public long getLongVolatile(final int index) { - boundsCheck(index, SIZE_OF_LONG); - - return UNSAFE.getLongVolatile(byteArray, addressOffset + index); - } - - public void putLongVolatile(final int index, final long value) { - boundsCheck(index, SIZE_OF_LONG); - - UNSAFE.putLongVolatile(byteArray, addressOffset + index, value); - } - - public void putLongOrdered(final int index, final long value) { - boundsCheck(index, SIZE_OF_LONG); - - UNSAFE.putOrderedLong(byteArray, addressOffset + index, value); - } - - public void addLongOrdered(final int index, final long increment) { - boundsCheck(index, SIZE_OF_LONG); - - final long offset = addressOffset + index; - final byte[] byteArray = this.byteArray; - final long value = UNSAFE.getLong(byteArray, offset); - UNSAFE.putOrderedLong(byteArray, offset, value + increment); - } - - public boolean compareAndSetLong(final int index, final long expectedValue, final long updateValue) { - boundsCheck(index, SIZE_OF_LONG); - - return UNSAFE.compareAndSwapLong(byteArray, addressOffset + index, expectedValue, updateValue); - } - - /////////////////////////////////////////////////////////////////////////// - - public int getInt(final int index, final ByteOrder byteOrder) { - boundsCheck(index, SIZE_OF_INT); - - int bits = UNSAFE.getInt(byteArray, addressOffset + index); - if (NATIVE_BYTE_ORDER != byteOrder) { - bits = Integer.reverseBytes(bits); - } - - return bits; - } - - public void putInt(final int index, final int value, final ByteOrder byteOrder) { - boundsCheck(index, SIZE_OF_INT); - - int bits = value; - if (NATIVE_BYTE_ORDER != byteOrder) { - bits = Integer.reverseBytes(bits); - } - - UNSAFE.putInt(byteArray, addressOffset + index, bits); - } - - public int getInt(final int index) { - boundsCheck(index, SIZE_OF_INT); - - return UNSAFE.getInt(byteArray, addressOffset + index); - } - - public void putInt(final int index, final int value) { - boundsCheck(index, SIZE_OF_INT); - - UNSAFE.putInt(byteArray, addressOffset + index, value); - } - - public int getIntVolatile(final int index) { - boundsCheck(index, SIZE_OF_INT); - - return UNSAFE.getIntVolatile(byteArray, addressOffset + index); - } - - public void putIntVolatile(final int index, final int value) { - boundsCheck(index, SIZE_OF_INT); - - UNSAFE.putIntVolatile(byteArray, addressOffset + index, value); - } - - public void putIntOrdered(final int index, final int value) { - boundsCheck(index, SIZE_OF_INT); - - UNSAFE.putOrderedInt(byteArray, addressOffset + index, value); - } - - public void addIntOrdered(final int index, final int increment) { - boundsCheck(index, SIZE_OF_INT); - - final long offset = addressOffset + index; - final byte[] byteArray = this.byteArray; - final int value = UNSAFE.getInt(byteArray, offset); - UNSAFE.putOrderedInt(byteArray, offset, value + increment); - } - - public boolean compareAndSetInt(final int index, final int expectedValue, final int updateValue) { - boundsCheck(index, SIZE_OF_INT); - - return UNSAFE.compareAndSwapInt(byteArray, addressOffset + index, expectedValue, updateValue); - } - - /////////////////////////////////////////////////////////////////////////// - - public double getDouble(final int index, final ByteOrder byteOrder) { - boundsCheck(index, SIZE_OF_DOUBLE); - - if (NATIVE_BYTE_ORDER != byteOrder) { - final long bits = UNSAFE.getLong(byteArray, addressOffset + index); - return Double.longBitsToDouble(Long.reverseBytes(bits)); - } else { - return UNSAFE.getDouble(byteArray, addressOffset + index); - } - } - - public void putDouble(final int index, final double value, final ByteOrder byteOrder) { - boundsCheck(index, SIZE_OF_DOUBLE); - - if (NATIVE_BYTE_ORDER != byteOrder) { - final long bits = Long.reverseBytes(Double.doubleToRawLongBits(value)); - UNSAFE.putLong(byteArray, addressOffset + index, bits); - } else { - UNSAFE.putDouble(byteArray, addressOffset + index, value); - } - } - - public double getDouble(final int index) { - boundsCheck(index, SIZE_OF_DOUBLE); - - return UNSAFE.getDouble(byteArray, addressOffset + index); - } - - public void putDouble(final int index, final double value) { - boundsCheck(index, SIZE_OF_DOUBLE); - - UNSAFE.putDouble(byteArray, addressOffset + index, value); - } - - /////////////////////////////////////////////////////////////////////////// - - public float getFloat(final int index, final ByteOrder byteOrder) { - boundsCheck(index, SIZE_OF_FLOAT); - - if (NATIVE_BYTE_ORDER != byteOrder) { - final int bits = UNSAFE.getInt(byteArray, addressOffset + index); - return Float.intBitsToFloat(Integer.reverseBytes(bits)); - } else { - return UNSAFE.getFloat(byteArray, addressOffset + index); - } - } - - public void putFloat(final int index, final float value, final ByteOrder byteOrder) { - boundsCheck(index, SIZE_OF_FLOAT); - - if (NATIVE_BYTE_ORDER != byteOrder) { - final int bits = Integer.reverseBytes(Float.floatToRawIntBits(value)); - UNSAFE.putLong(byteArray, addressOffset + index, bits); - } else { - UNSAFE.putFloat(byteArray, addressOffset + index, value); - } - } - - public float getFloat(final int index) { - boundsCheck(index, SIZE_OF_FLOAT); - - return UNSAFE.getFloat(byteArray, addressOffset + index); - } - - public void putFloat(final int index, final float value) { - boundsCheck(index, SIZE_OF_FLOAT); - - UNSAFE.putFloat(byteArray, addressOffset + index, value); - } - - /////////////////////////////////////////////////////////////////////////// - - public short getShort(final int index, final ByteOrder byteOrder) { - boundsCheck(index, SIZE_OF_SHORT); - - short bits = UNSAFE.getShort(byteArray, addressOffset + index); - if (NATIVE_BYTE_ORDER != byteOrder) { - bits = Short.reverseBytes(bits); - } - - return bits; - } - - public void putShort(final int index, final short value, final ByteOrder byteOrder) { - boundsCheck(index, SIZE_OF_SHORT); - - short bits = value; - if (NATIVE_BYTE_ORDER != byteOrder) { - bits = Short.reverseBytes(bits); - } - - UNSAFE.putShort(byteArray, addressOffset + index, bits); - } - - public short getShort(final int index) { - boundsCheck(index, SIZE_OF_SHORT); - - return UNSAFE.getShort(byteArray, addressOffset + index); - } - - public void putShort(final int index, final short value) { - boundsCheck(index, SIZE_OF_SHORT); - - UNSAFE.putShort(byteArray, addressOffset + index, value); - } - - public short getShortVolatile(final int index) { - boundsCheck(index, SIZE_OF_SHORT); - - return UNSAFE.getShortVolatile(byteArray, addressOffset + index); - } - - public void putShortVolatile(final int index, final short value) { - boundsCheck(index, SIZE_OF_SHORT); - - UNSAFE.putShortVolatile(byteArray, addressOffset + index, value); - } - - /////////////////////////////////////////////////////////////////////////// - - public byte getByte(final int index) { - boundsCheck(index, SIZE_OF_BYTE); - - return UNSAFE.getByte(byteArray, addressOffset + index); - } - - public void putByte(final int index, final byte value) { - boundsCheck(index, SIZE_OF_BYTE); - - UNSAFE.putByte(byteArray, addressOffset + index, value); - } - - public int getBytes(final int index, final byte[] dst) { - return getBytes(index, dst, 0, dst.length); - } - - public int getBytes(final int index, final byte[] dst, final int offset, final int length) { - int count = Math.min(length, capacity - index); - count = Math.min(count, dst.length); - - boundsCheck(index, count); - - UNSAFE.copyMemory(byteArray, addressOffset + index, dst, ARRAY_BASE_OFFSET + offset, count); - - return count; - } - - public void getBytes(final int index, final DirectBuffer dstBuffer, final int dstIndex, final int length) { - dstBuffer.putBytes(dstIndex, this, index, length); - } - - public int getBytes(final int index, final ByteBuffer dstBuffer, final int length) { - int count = Math.min(dstBuffer.remaining(), capacity - index); - count = Math.min(count, length); - - boundsCheck(index, count); - - final int dstOffset = dstBuffer.position(); - final byte[] dstByteArray; - final long dstBaseOffset; - if (dstBuffer.hasArray()) { - dstByteArray = dstBuffer.array(); - dstBaseOffset = ARRAY_BASE_OFFSET + dstBuffer.arrayOffset(); - } else { - dstByteArray = null; - dstBaseOffset = ((sun.nio.ch.DirectBuffer) dstBuffer).address(); - } - - UNSAFE.copyMemory(byteArray, addressOffset + index, dstByteArray, dstBaseOffset + dstOffset, count); - dstBuffer.position(dstBuffer.position() + count); - - return count; - } - - public int putBytes(final int index, final byte[] src) { - return putBytes(index, src, 0, src.length); - } - - public int putBytes(final int index, final byte[] src, final int offset, final int length) { - int count = Math.min(length, capacity - index); - count = Math.min(count, src.length); - - boundsCheck(index, count); - - UNSAFE.copyMemory(src, ARRAY_BASE_OFFSET + offset, byteArray, addressOffset + index, length); - - return length; - } - - public int putBytes(final int index, final ByteBuffer srcBuffer, final int length) { - int count = Math.min(srcBuffer.remaining(), length); - count = Math.min(count, capacity - index); - - boundsCheck(index, count); - - count = putBytes(index, srcBuffer, srcBuffer.position(), count); - srcBuffer.position(srcBuffer.position() + count); - - return count; - } - - public int putBytes(final int index, final ByteBuffer srcBuffer, final int srcIndex, final int length) { - int count = Math.min(length, capacity - index); - count = Math.min(count, srcBuffer.capacity() - srcIndex); - - boundsCheck(index, count); - - final byte[] srcByteArray; - final long srcBaseOffset; - if (srcBuffer.hasArray()) { - srcByteArray = srcBuffer.array(); - srcBaseOffset = ARRAY_BASE_OFFSET + srcBuffer.arrayOffset() + srcIndex; - } else { - srcByteArray = null; - srcBaseOffset = ((sun.nio.ch.DirectBuffer) srcBuffer).address(); - } - - UNSAFE.copyMemory(srcByteArray, srcBaseOffset + srcIndex, byteArray, addressOffset + index, count); - - return count; - } - - public void putBytes(final int index, final DirectBuffer srcBuffer, final int srcIndex, final int length) { - boundsCheck(index, length); - srcBuffer.boundsCheck(srcIndex, length); - - UNSAFE.copyMemory( - srcBuffer.byteArray(), - srcBuffer.addressOffset() + srcIndex, - byteArray, - addressOffset + index, - length); - } - - /////////////////////////////////////////////////////////////////////////// - - public String getStringUtf8(final int offset, final ByteOrder byteOrder) { - final int length = getInt(offset, byteOrder); - - return getStringUtf8(offset, length); - } - - public String getStringUtf8(final int offset, final int length) { - final byte[] stringInBytes = new byte[length]; - getBytes(offset + SIZE_OF_INT, stringInBytes); - - return new String(stringInBytes, StandardCharsets.UTF_8); - } - - public int putStringUtf8(final int offset, final String value, final ByteOrder byteOrder, final int maxEncodedSize) { - final byte[] bytes = value != null ? value.getBytes(StandardCharsets.UTF_8) : NULL_BYTES; - if (bytes.length > maxEncodedSize) { - throw new IllegalArgumentException("Encoded string larger than maximum size: " + maxEncodedSize); - } - - putInt(offset, bytes.length, byteOrder); - putBytes(offset + SIZE_OF_INT, bytes); - - return SIZE_OF_INT + bytes.length; - } - - public String getStringWithoutLengthUtf8(final int offset, final int length) { - final byte[] stringInBytes = new byte[length]; - getBytes(offset, stringInBytes); - - return new String(stringInBytes, StandardCharsets.UTF_8); - } - - public int putStringWithoutLengthUtf8(final int offset, final String value) { - final byte[] bytes = value != null ? value.getBytes(StandardCharsets.UTF_8) : NULL_BYTES; - putBytes(offset, bytes); - - return bytes.length; - } - - /////////////////////////////////////////////////////////////////////////// - - public void boundsCheck(final int index, final int length) { - if (SHOULD_BOUNDS_CHECK) { - if (index < 0 || length < 0 || (index + length) > capacity) { - throw new IndexOutOfBoundsException(String.format("index=%d, length=%d, capacity=%d", index, length, capacity)); - } - } - } - - public ByteString getString(final int offset) { - byte terminator = 1; - int index = offset; - while (terminator != (byte)0x00) { - boundsCheck(index, SIZE_OF_BYTE); - terminator = UNSAFE.getByte(byteArray, addressOffset + index++); - } - byte[] bytes = new byte[index - offset - 1]; - getBytes(offset, bytes); - return new ByteString(bytes); - } - - public void putString(final int offset, final ByteString value) { - final byte[] bytes = value.getBytes(); - putBytes(offset, bytes); - putByte(offset + bytes.length, (byte)0x00); - } - - public boolean getBoolean(int offset) { - boundsCheck(offset, SIZE_OF_BYTE); - return UNSAFE.getBoolean(byteArray, addressOffset + offset); - } - - public void putBoolean(int index, boolean value) { - boundsCheck(index, SIZE_OF_BOOLEAN); - UNSAFE.putBoolean(byteArray, addressOffset + index, value); - } - - public char getChar(int offset) { - boundsCheck(offset, SIZE_OF_CHAR); - return UNSAFE.getChar(byteArray, addressOffset + offset); - } - - public void putChar(int index, char value) { - boundsCheck(index, SIZE_OF_INT); - UNSAFE.putInt(byteArray, addressOffset + index, value); - } -} \ No newline at end of file diff --git a/lmdbjni/src/main/java/org/fusesource/lmdbjni/Env.java b/lmdbjni/src/main/java/org/fusesource/lmdbjni/Env.java index 2484474..2c2840a 100644 --- a/lmdbjni/src/main/java/org/fusesource/lmdbjni/Env.java +++ b/lmdbjni/src/main/java/org/fusesource/lmdbjni/Env.java @@ -28,6 +28,8 @@ */ public class Env extends NativeObject implements AutoCloseable { + public static final int MAX_KEY_SIZE = 511; + public static String version() { return string(JNI.MDB_VERSION_STRING); } diff --git a/lmdbjni/src/main/java/org/fusesource/lmdbjni/Transaction.java b/lmdbjni/src/main/java/org/fusesource/lmdbjni/Transaction.java index b847116..59c8044 100644 --- a/lmdbjni/src/main/java/org/fusesource/lmdbjni/Transaction.java +++ b/lmdbjni/src/main/java/org/fusesource/lmdbjni/Transaction.java @@ -20,6 +20,7 @@ import java.io.Closeable; import java.nio.ByteBuffer; +import org.agrona.DirectBuffer; import static org.fusesource.lmdbjni.JNI.*; import static org.fusesource.lmdbjni.Util.checkErrorCode; @@ -131,7 +132,7 @@ public boolean isReadOnly() { long getBufferAddress() { if (buffer == null) { - buffer = new DirectBuffer(ByteBuffer.allocateDirect(Unsafe.ADDRESS_SIZE * 4)); + buffer = Buffers.buffer(Unsafe.ADDRESS_SIZE * 4); } return buffer.addressOffset(); } diff --git a/lmdbjni/src/main/java/org/fusesource/lmdbjni/Util.java b/lmdbjni/src/main/java/org/fusesource/lmdbjni/Util.java index ac58e0c..aa46ea9 100644 --- a/lmdbjni/src/main/java/org/fusesource/lmdbjni/Util.java +++ b/lmdbjni/src/main/java/org/fusesource/lmdbjni/Util.java @@ -19,6 +19,8 @@ package org.fusesource.lmdbjni; import java.nio.charset.Charset; +import org.agrona.DirectBuffer; +import org.agrona.MutableDirectBuffer; import static org.fusesource.lmdbjni.JNI.mdb_strerror; import static org.fusesource.lmdbjni.JNI.strlen; @@ -59,4 +61,38 @@ static boolean isAndroid() { return false; } } + + /** + * Fetches a C-style null terminated char[] from the specified buffer, returning + * a {@link ByteString} representation. + * + * @param buffer to fetch the string from + * @param offset index within the buffer to commence the null character search + * @return the located string + */ + public static ByteString getString(final DirectBuffer buffer, + final int offset) { + byte terminator = 1; + int index = offset; + while (terminator != (byte) 0) { + terminator = buffer.getByte(index++); + } + byte[] bytes = new byte[index - offset - 1]; + buffer.getBytes(offset, bytes); + return new ByteString(bytes); + } + + /** + * Stores a C-style null terminated char[] in the specified buffer. + * + * @param buffer to store the string in + * @param offset index within the buffer to commence writing the value + * @param value to store (should not contain any null character) + */ + public static void putString(final MutableDirectBuffer buffer, + final int offset, final ByteString value) { + final byte[] bytes = value.getBytes(); + buffer.putBytes(offset, bytes); + buffer.putByte(offset + bytes.length, (byte) 0); + } } diff --git a/lmdbjni/src/test/java/org/fusesource/lmdbjni/BufferCursorTest.java b/lmdbjni/src/test/java/org/fusesource/lmdbjni/BufferCursorTest.java index c23bc41..a51fd0d 100644 --- a/lmdbjni/src/test/java/org/fusesource/lmdbjni/BufferCursorTest.java +++ b/lmdbjni/src/test/java/org/fusesource/lmdbjni/BufferCursorTest.java @@ -7,11 +7,11 @@ import org.junit.rules.TemporaryFolder; import java.io.IOException; -import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.util.Arrays; import java.util.HashMap; import java.util.LinkedList; +import org.agrona.MutableDirectBuffer; import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.*; @@ -382,13 +382,13 @@ public void testAppend() { @Test public void testDirectBuffer() { - DirectBuffer key = new DirectBuffer(ByteBuffer.allocateDirect(10)); - DirectBuffer value = new DirectBuffer(ByteBuffer.allocateDirect(10)); + MutableDirectBuffer key = Buffers.buffer(10); + MutableDirectBuffer value = Buffers.buffer(10); try (Transaction tx = env.createWriteTransaction()) { try (BufferCursor cursor = db.bufferCursor(tx, key, value)) { cursor.setWriteMode(); - key.putString(0, new ByteString("a")); - value.putString(0, new ByteString("a")); + Util.putString(key, 0, new ByteString("a")); + Util.putString(value, 0, new ByteString("a")); cursor.put(); cursor.last(); assertThat(cursor.keyUtf8(0).getString(), is("a")); @@ -396,8 +396,8 @@ public void testDirectBuffer() { } tx.commit(); } - key = new DirectBuffer(ByteBuffer.allocateDirect(10)); - value = new DirectBuffer(ByteBuffer.allocateDirect(10)); + key = Buffers.buffer(10); + value = Buffers.buffer(10); try (Transaction tx = env.createReadTransaction()) { try (BufferCursor cursor = db.bufferCursor(tx, key, value)) { assertTrue(cursor.last()); @@ -408,7 +408,7 @@ public void testDirectBuffer() { assertThat(cursor.valUtf8(0).getString(), is("a")); cursor.setWriteMode(); // Position at key = 'Z\0' - key.putString(0, new ByteString("Z")); + Util.putString(key, 0, new ByteString("Z")); assertThat(cursor.seekKey(), is(false)); // Position at key >= 'Z\0' assertThat(cursor.seekRange(), is(true)); @@ -695,7 +695,7 @@ public void testValueBufferExpansionDirectBuffer() { try (Transaction tx = env.createWriteTransaction()) { try (BufferCursor cursor = db.bufferCursor(tx, 1)) { cursor.first(); - DirectBuffer directBuffer = new DirectBuffer(ByteBuffer.allocateDirect(10)); + MutableDirectBuffer directBuffer = Buffers.buffer(10); directBuffer.putLong(0, 111L); cursor.keyWriteByte(111) .valWrite(directBuffer, 8) @@ -709,7 +709,7 @@ public void testValueBufferExpansionDirectBuffer() { try (BufferCursor cursor = db.bufferCursor(tx)) { cursor.last(); assertThat(cursor.keyByte(0), is((byte) 111)); - DirectBuffer directBuffer = cursor.valDirectBuffer(); + MutableDirectBuffer directBuffer = cursor.valDirectBuffer(); assertThat(directBuffer.capacity(), is(8 * 3)); assertThat(directBuffer.getLong(0), is(111L)); assertThat(directBuffer.getLong(8), is(111L)); @@ -765,13 +765,13 @@ public void testWriteBuffers() { try (Transaction tx = env.createWriteTransaction()) { try (BufferCursor cursor = db.bufferCursor(tx)) { cursor.first(); - DirectBuffer key = new DirectBuffer(); - key.putString(0, stringKey); + MutableDirectBuffer key = Buffers.buffer(); + Util.putString(key, 0, stringKey); // remember NULL byte cursor.keyWrite(key, stringKey.size() + 1); cursor.valWriteInt(12); - DirectBuffer value = new DirectBuffer(); - value.putString(0, stringValue); + MutableDirectBuffer value = Buffers.buffer(); + Util.putString(value, 0, stringValue); // remember NULL byte cursor.valWrite(value, stringValue.size() + 1); cursor.put(); @@ -803,7 +803,7 @@ public void testWriteToReadOnlyBuffer() { cursor.keyWriteUtf8(""); cursor.keyWriteUtf8(new ByteString("")); cursor.keyWriteBytes(new byte[]{0}); - cursor.keyWrite(new DirectBuffer(new byte[0]), 0); + cursor.keyWrite(Buffers.buffer(0)); assertEACCES(new Runnable() { public void run() { cursor.valWriteByte(0); }}); assertEACCES(new Runnable() { public void run() { cursor.valWriteShort((short) 0); }}); assertEACCES(new Runnable() { public void run() { cursor.valWriteInt(0); }}); @@ -813,7 +813,7 @@ public void testWriteToReadOnlyBuffer() { assertEACCES(new Runnable() { public void run() { cursor.valWriteUtf8(""); }}); assertEACCES(new Runnable() { public void run() { cursor.valWriteUtf8(new ByteString("")); }}); assertEACCES(new Runnable() { public void run() { cursor.valWriteBytes(new byte[]{0});}}); - assertEACCES(new Runnable() { public void run() { cursor.valWrite(new DirectBuffer(new byte[0]), 0);}}); + assertEACCES(new Runnable() { public void run() { cursor.valWrite(Buffers.buffer(0));}}); } } } diff --git a/lmdbjni/src/test/java/org/fusesource/lmdbjni/ComparatorTest.java b/lmdbjni/src/test/java/org/fusesource/lmdbjni/ComparatorTest.java index 0708661..fda7bfc 100644 --- a/lmdbjni/src/test/java/org/fusesource/lmdbjni/ComparatorTest.java +++ b/lmdbjni/src/test/java/org/fusesource/lmdbjni/ComparatorTest.java @@ -8,6 +8,8 @@ import java.io.IOException; import java.util.Comparator; +import org.agrona.DirectBuffer; +import org.agrona.MutableDirectBuffer; import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.*; diff --git a/lmdbjni/src/test/java/org/fusesource/lmdbjni/DatabaseTest.java b/lmdbjni/src/test/java/org/fusesource/lmdbjni/DatabaseTest.java index dc3c14b..7a196a8 100644 --- a/lmdbjni/src/test/java/org/fusesource/lmdbjni/DatabaseTest.java +++ b/lmdbjni/src/test/java/org/fusesource/lmdbjni/DatabaseTest.java @@ -8,6 +8,7 @@ import java.io.IOException; import java.nio.ByteBuffer; +import org.agrona.MutableDirectBuffer; import static org.fusesource.lmdbjni.Bytes.fromLong; import static org.hamcrest.CoreMatchers.is; @@ -88,7 +89,7 @@ public void testStat() { @Test public void testDeleteBuffer() { db.put(new byte[]{1}, new byte[]{1}); - DirectBuffer key = new DirectBuffer(ByteBuffer.allocateDirect(1)); + MutableDirectBuffer key = Buffers.buffer(1); key.putByte(0, (byte) 1); db.delete(key); assertNull(db.get(new byte[]{1})); diff --git a/lmdbjni/src/test/java/org/fusesource/lmdbjni/EnvTest.java b/lmdbjni/src/test/java/org/fusesource/lmdbjni/EnvTest.java index 14497d4..072e015 100644 --- a/lmdbjni/src/test/java/org/fusesource/lmdbjni/EnvTest.java +++ b/lmdbjni/src/test/java/org/fusesource/lmdbjni/EnvTest.java @@ -141,7 +141,7 @@ public void testMaxKeySize() throws Exception { String path = tmp.newFolder().getCanonicalPath(); try (Env env = new Env()) { env.open(path); - assertThat(env.getMaxKeySize(), is(511L)); + assertThat(env.getMaxKeySize(), is((long)Env.MAX_KEY_SIZE)); } } diff --git a/lmdbjni/src/test/java/org/fusesource/lmdbjni/SbeTest.java b/lmdbjni/src/test/java/org/fusesource/lmdbjni/SbeTest.java new file mode 100644 index 0000000..b03ef9a --- /dev/null +++ b/lmdbjni/src/test/java/org/fusesource/lmdbjni/SbeTest.java @@ -0,0 +1,315 @@ +package org.fusesource.lmdbjni; + +import baseline.*; +import baseline.CarDecoder.FuelFiguresDecoder; +import baseline.CarDecoder.PerformanceFiguresDecoder; +import baseline.CarDecoder.PerformanceFiguresDecoder.AccelerationDecoder; +import java.io.IOException; + +import java.io.UnsupportedEncodingException; +import java.nio.ByteOrder; +import java.nio.charset.StandardCharsets; +import org.agrona.MutableDirectBuffer; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.core.IsNot.not; +import static org.hamcrest.core.IsNull.nullValue; +import static org.junit.Assert.assertThat; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class SbeTest { + + // buffers and flyweights + private static final int MAX_VAL_SIZE = 4096; + private static final MutableDirectBuffer MDB_KEY = Buffers.buffer(0); + private static final MutableDirectBuffer MDB_VAL = Buffers.buffer(0); + private static final MessageHeaderDecoder MESSAGE_HEADER_DECODER = new MessageHeaderDecoder(); + private static final MessageHeaderEncoder MESSAGE_HEADER_ENCODER = new MessageHeaderEncoder(); + private static final CarDecoder CAR_DECODER = new CarDecoder(); + private static final CarEncoder CAR_ENCODER = new CarEncoder(); + + // static test data (for consistency across storage and verification steps) + private static final byte[] VEHICLE_CODE; + private static final byte[] MANUFACTURER_CODE; + private static final byte[] MAKE; + private static final byte[] MODEL; + private static final int MODEL_YEAR = 2013; + private static final MutableDirectBuffer ACTIVATION_CODE; + private static final BooleanType AVAILABLE = BooleanType.T; + private static final Model CODE = Model.A; + private static final int CAPACITY = 2000; + + // test serial number generation (to ensure Endian byte boundary crossed) + private static final int SN_BEGIN = 250; + private static final int SN_UNTIL = 260; + + static { + try { + VEHICLE_CODE = "abcdef".getBytes(CarEncoder.vehicleCodeCharacterEncoding()); + MANUFACTURER_CODE = "123".getBytes(EngineEncoder.manufacturerCodeCharacterEncoding()); + MAKE = "Honda".getBytes(CarEncoder.makeCharacterEncoding()); + MODEL = "Civic VTi".getBytes(CarEncoder.modelCharacterEncoding()); + final byte[] code = "abcdef".getBytes(CarEncoder.activationCodeCharacterEncoding()); + ACTIVATION_CODE = Buffers.buffer(code.length); + ACTIVATION_CODE.putBytes(0, code); + } catch (final UnsupportedEncodingException ex) { + throw new RuntimeException(ex); + } + } + + private void resetBuffers() { + MDB_KEY.wrap(Buffers.buffer(Long.BYTES).byteBuffer()); + MDB_VAL.wrap(Buffers.buffer(MAX_VAL_SIZE).byteBuffer()); + } + + @Rule + public TemporaryFolder tmp = new TemporaryFolder(); + + @Before + public void before() { + resetBuffers(); + } + + @Test + public void verifySbeBufferReuseWithoutAnyLmdbInvolvement() { + for (int sn = SN_UNTIL; sn >= SN_BEGIN; sn--) { + encodeSbe(sn); + decodeSbe(sn); + } + } + + @Test(expected = AssertionError.class) + public void verifySbeDecodeReallyVerifiesTheSerialNumber() { + encodeSbe(100); + decodeSbe(99); // wrong SN + } + + /** + * Exemplifies how to share zero copy buffers between LMDB and SBE. + *

+ * For simplicity we are using a {@link Long} key here. It is feasible to use + * an SBE-encoded key instead, but it would over-complicate this test (as we + * would need an extra SBE decoder flyweight and encoder flyweight). If you + * intend to encode your keys via SBE, carefully consider (a) the byte order + * and (b) the effect of SBE composites (eg messageHeader, + * varDataEncoding, groupSizeEncoding) in the keys. + * These factors are likely to yield keys in a different order than you + * probably want. If storing SBE-encoded keys you might prefer to declare the + * SBE schema as Big Endian byte order, and rely on either SBE composites or + * very simple message types (without any requirement for headers, versions, + * variable length encoded types etc). + * + * @throws IOException + */ + @Test + public void verifySbeValuesStoreViaBufferCursor() throws IOException { + try (Env env = new Env()) { + env.setMapSize(10, ByteUnit.MEBIBYTES); + env.open(tmp.newFolder().getCanonicalPath()); + Env.pushMemoryPool(1024); + + try (Database db = env.openDatabase("cars");) { + for (int sn = SN_UNTIL; sn >= SN_BEGIN; sn--) { + encodeSbe(sn); + MDB_KEY.putLong(0, sn, ByteOrder.BIG_ENDIAN); + try (Transaction tx = env.createWriteTransaction()) { + try (BufferCursor cursor = db.bufferCursor(tx, MDB_KEY, MDB_VAL)) { + cursor.setWriteMode(); + boolean result = cursor.overwrite(); + assertThat(result, is(true)); + } + tx.commit(); + } + } + + readUsingCursorNext(env, db); + readUsingCursorPrev(env, db); + } + + Env.popMemoryPool(); + } + } + + private void readUsingCursorNext(Env env, Database db) { + resetBuffers(); + try (Transaction tx = env.createReadTransaction();) { + try (BufferCursor cursor = db.bufferCursor(tx, MDB_KEY, MDB_VAL);) { + cursor.setWriteMode(); + assertThat(MDB_KEY.byteBuffer(), not(nullValue())); + long sn = SN_BEGIN; + MDB_KEY.putLong(0, sn, ByteOrder.BIG_ENDIAN); // legal before seek + assertThat(cursor.seekKey(), is(true)); // MDB_* now read-only + assertThat(cursor.keyLength(), not(0)); + assertThat(cursor.valLength(), not(0)); + decodeSbe(sn); + + // iterate remainder + int done = 1; + while (cursor.next()) { + sn = SN_BEGIN + done; + assertThat(MDB_KEY.getLong(0, ByteOrder.BIG_ENDIAN), is(sn)); + assertThat(cursor.keyLength(), not(0)); + assertThat(cursor.valLength(), not(0)); + decodeSbe(sn); + done++; + } + assertThat(done, is(SN_UNTIL - SN_BEGIN + 1)); + + // some extra seeks + sn = SN_BEGIN; + assertThat(cursor.first(), is(true)); + assertThat(MDB_KEY.getLong(0, ByteOrder.BIG_ENDIAN), is(sn)); + decodeSbe(sn); + + sn = SN_UNTIL; + assertThat(cursor.last(), is(true)); + assertThat(MDB_KEY.getLong(0, ByteOrder.BIG_ENDIAN), is(sn)); + decodeSbe(sn); + } + } + } + + private void readUsingCursorPrev(Env env, Database db) { + resetBuffers(); + try (Transaction tx = env.createReadTransaction();) { + try (BufferCursor cursor = db.bufferCursor(tx, MDB_KEY, MDB_VAL);) { + cursor.setWriteMode(); + assertThat(MDB_KEY.byteBuffer(), not(nullValue())); + long sn = SN_UNTIL; + MDB_KEY.putLong(0, sn, ByteOrder.BIG_ENDIAN); // legal before seek + assertThat(cursor.seekKey(), is(true)); // MDB_* now read-only + assertThat(cursor.keyLength(), not(0)); + assertThat(cursor.valLength(), not(0)); + decodeSbe(sn); + + // iterate remainder + int done = 1; + while (cursor.prev()) { + sn = SN_UNTIL - done; + assertThat(MDB_KEY.getLong(0, ByteOrder.BIG_ENDIAN), is(sn)); + assertThat(cursor.keyLength(), not(0)); + assertThat(cursor.valLength(), not(0)); + decodeSbe(sn); + done++; + } + assertThat(done, is(SN_UNTIL - SN_BEGIN + 1)); + } + } + } + + private void encodeSbe(long serialNumber) { + int bufferOffset = 0; + MESSAGE_HEADER_ENCODER + .wrap(MDB_VAL, bufferOffset) + .blockLength(CAR_ENCODER.sbeBlockLength()) + .templateId(CAR_ENCODER.sbeTemplateId()) + .schemaId(CAR_ENCODER.sbeSchemaId()) + .version(CAR_ENCODER.sbeSchemaVersion()); + + bufferOffset += MESSAGE_HEADER_ENCODER.encodedLength(); + + CAR_ENCODER.wrap(MDB_VAL, bufferOffset); + + final int srcOffset = 0; + CAR_ENCODER.serialNumber(serialNumber) + .modelYear(MODEL_YEAR) + .available(AVAILABLE) + .code(CODE) + .putVehicleCode(VEHICLE_CODE, srcOffset); + + for (int i = 0, size = CarEncoder.someNumbersLength(); i < size; i++) { + CAR_ENCODER.someNumbers(i, i); + } + + CAR_ENCODER.extras() + .clear() + .cruiseControl(true) + .sportsPack(true) + .sunRoof(false); + + CAR_ENCODER.engine() + .capacity(CAPACITY) + .numCylinders((short) 4) + .putManufacturerCode(MANUFACTURER_CODE, srcOffset) + .booster().boostType(BoostType.NITROUS).horsePower((short) 200); + + CAR_ENCODER.fuelFiguresCount(3) + .next().speed(30).mpg(35.9f).usageDescription("Urban Cycle") + .next().speed(55).mpg(49.0f).usageDescription("Combined Cycle") + .next().speed(75).mpg(40.0f).usageDescription("Highway Cycle"); + + final CarEncoder.PerformanceFiguresEncoder perfFigures = CAR_ENCODER.performanceFiguresCount(2); + perfFigures.next() + .octaneRating((short) 95) + .accelerationCount(3) + .next().mph(30).seconds(4.0f) + .next().mph(60).seconds(7.5f) + .next().mph(100).seconds(12.2f); + perfFigures.next() + .octaneRating((short) 99) + .accelerationCount(3) + .next().mph(30).seconds(3.8f) + .next().mph(60).seconds(7.1f) + .next().mph(100).seconds(11.8f); + + CAR_ENCODER.make(new String(MAKE, StandardCharsets.UTF_8)) + .putModel(MODEL, srcOffset, MODEL.length) + .putActivationCode(ACTIVATION_CODE, 0, ACTIVATION_CODE.capacity()); + } + + private void decodeSbe(long serialNumber) { + int bufferOffset = 0; + MESSAGE_HEADER_DECODER.wrap(MDB_VAL, bufferOffset); + + final int templateId = MESSAGE_HEADER_DECODER.templateId(); + assertThat(templateId, is(CarEncoder.TEMPLATE_ID)); + + final int actingBlockLength = MESSAGE_HEADER_DECODER.blockLength(); + assertThat(actingBlockLength, is(CarEncoder.BLOCK_LENGTH)); + + final int schemaId = MESSAGE_HEADER_DECODER.schemaId(); + assertThat(schemaId, is(CarEncoder.SCHEMA_ID)); + + final int actingVersion = MESSAGE_HEADER_DECODER.version(); + assertThat(actingVersion, is(CarEncoder.SCHEMA_VERSION)); + + bufferOffset += MESSAGE_HEADER_DECODER.encodedLength(); + CAR_DECODER.wrap(MDB_VAL, bufferOffset, actingBlockLength, actingVersion); + + final byte[] buffer = new byte[128]; + + assertThat(CAR_DECODER.serialNumber(), is(serialNumber)); + assertThat(CAR_DECODER.modelYear(), is(MODEL_YEAR)); + assertThat(CAR_DECODER.available(), is(AVAILABLE)); + assertThat(CAR_DECODER.code(), is(CODE)); + + for (int i = 0, size = CarEncoder.someNumbersLength(); i < size; i++) { + assertThat((int) CAR_DECODER.someNumbers(i), is(i)); + } + + final OptionalExtrasDecoder extras = CAR_DECODER.extras(); + assertThat(extras.cruiseControl(), is(true)); + assertThat(extras.sportsPack(), is(true)); + assertThat(extras.sunRoof(), is(false)); + + final EngineDecoder engine = CAR_DECODER.engine(); + assertThat(engine.capacity(), is(CAPACITY)); + + for (final FuelFiguresDecoder fuelFigures : CAR_DECODER.fuelFigures()) { + assertThat(fuelFigures.count(), is(3)); + fuelFigures.usageDescription(); + } + + for (final PerformanceFiguresDecoder perfFigures : CAR_DECODER.performanceFigures()) { + perfFigures.octaneRating(); + for (AccelerationDecoder acceleration : perfFigures.acceleration()) { + acceleration.mph(); + } + assertThat(perfFigures.count(), is(2)); + } + + assertThat(CAR_DECODER.make(), is(new String(MAKE, StandardCharsets.UTF_8))); + } +} diff --git a/lmdbjni/src/test/java/org/fusesource/lmdbjni/ZeroCopyTest.java b/lmdbjni/src/test/java/org/fusesource/lmdbjni/ZeroCopyTest.java index 7cb5727..69274ab 100644 --- a/lmdbjni/src/test/java/org/fusesource/lmdbjni/ZeroCopyTest.java +++ b/lmdbjni/src/test/java/org/fusesource/lmdbjni/ZeroCopyTest.java @@ -9,6 +9,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import org.agrona.MutableDirectBuffer; import static org.fusesource.lmdbjni.Constants.FIRST; import static org.fusesource.lmdbjni.Constants.NEXT; @@ -29,10 +30,10 @@ public class ZeroCopyTest { private Database db; private Env env; - private DirectBuffer k1 = new DirectBuffer(); - private DirectBuffer v1 = new DirectBuffer(); - private DirectBuffer k2 = new DirectBuffer(); - private DirectBuffer v2 = new DirectBuffer(); + private MutableDirectBuffer k1 = Buffers.buffer(); + private MutableDirectBuffer v1 = Buffers.buffer(); + private MutableDirectBuffer k2 = Buffers.buffer(); + private MutableDirectBuffer v2 = Buffers.buffer(); @Before public void before() throws IOException { @@ -58,8 +59,8 @@ public void testPutAndGetAndDelete() throws Exception { db.put(k1, v1); db.put(k2, v2); - DirectBuffer k = new DirectBuffer(); - DirectBuffer v = new DirectBuffer(); + MutableDirectBuffer k = Buffers.buffer(); + MutableDirectBuffer v = Buffers.buffer(); k.putLong(0, 10); db.get(k, v); assertThat(v.getLong(0), is(11L)); @@ -87,8 +88,8 @@ public void testCursorPutAndGet() throws Exception { tx.commit(); } - DirectBuffer k = new DirectBuffer(); - DirectBuffer v = new DirectBuffer(); + MutableDirectBuffer k = Buffers.buffer(); + MutableDirectBuffer v = Buffers.buffer(); try (Transaction tx = env.createReadTransaction()) { try (Cursor cursor = db.openCursor(tx)) { @@ -122,8 +123,9 @@ public void testCursorSeekRange() throws Exception { try (Transaction tx = env.createReadTransaction()) { try (Cursor cursor = db.openCursor(tx)) { - DirectBuffer k = new DirectBuffer(byteBuffer); - DirectBuffer v = new DirectBuffer(); + MutableDirectBuffer k = Buffers.buffer(0); + k.wrap(byteBuffer); + MutableDirectBuffer v = Buffers.buffer(); k.putLong(0, 10); cursor.seekPosition(k, v, SeekOp.RANGE); @@ -150,8 +152,8 @@ public void testIteration() { List result = new ArrayList<>(); try (Transaction tx = env.createWriteTransaction()) { try (Cursor cursor = db.openCursor(tx)) { - DirectBuffer k = new DirectBuffer(); - DirectBuffer v = new DirectBuffer(); + MutableDirectBuffer k = Buffers.buffer(); + MutableDirectBuffer v = Buffers.buffer(); for (int rc = cursor.position(k, v, FIRST); rc != NOTFOUND; rc = cursor.position(k, v, NEXT)) { result.add(k.getLong(0)); } @@ -169,9 +171,9 @@ public void testReserve() { byte[] val = new byte[]{3, 2, 1}; try (Transaction tx = env.createWriteTransaction()) { - DirectBuffer keyBuf = new DirectBuffer(ByteBuffer.allocateDirect(key.length)); + MutableDirectBuffer keyBuf = Buffers.buffer(key.length); keyBuf.putBytes(0, key); - DirectBuffer valBuf = db.reserve(tx, keyBuf, val.length); + MutableDirectBuffer valBuf = db.reserve(tx, keyBuf, val.length); valBuf.putBytes(0, val); tx.commit(); } @@ -189,9 +191,9 @@ public void testReserveCursor() { try (Transaction tx = env.createWriteTransaction()) { try (Cursor cursor = db.openCursor(tx)) { - DirectBuffer keyBuf = new DirectBuffer(ByteBuffer.allocateDirect(key.length)); + MutableDirectBuffer keyBuf = Buffers.buffer(key.length); keyBuf.putBytes(0, key); - DirectBuffer valBuf = cursor.reserve(keyBuf, val.length); + MutableDirectBuffer valBuf = cursor.reserve(keyBuf, val.length); valBuf.putBytes(0, val); } tx.commit(); @@ -210,9 +212,9 @@ public void testReserveCursorRollback() { Transaction tx = env.createWriteTransaction(); Cursor cursor = db.openCursor(tx); - DirectBuffer keyBuf = new DirectBuffer(ByteBuffer.allocateDirect(key.length)); + MutableDirectBuffer keyBuf = Buffers.buffer(key.length); keyBuf.putBytes(0, key); - DirectBuffer valBuf = cursor.reserve(keyBuf, val.length); + MutableDirectBuffer valBuf = cursor.reserve(keyBuf, val.length); valBuf.putBytes(0, val); tx.abort(); diff --git a/lmdbjni/src/test/resources/car.xml b/lmdbjni/src/test/resources/car.xml new file mode 100644 index 0000000..852f5d7 --- /dev/null +++ b/lmdbjni/src/test/resources/car.xml @@ -0,0 +1,88 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + T + S + N + K + + + + + + + 9000 + + Petrol + + + + 0 + 1 + + + A + B + C + + + 0 + 1 + 2 + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file