diff --git a/java/memory/src/main/java/org/apache/arrow/memory/Accountant.java b/java/memory/src/main/java/org/apache/arrow/memory/Accountant.java index 5bd6b9fe37956..e3ef2cca8144b 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/Accountant.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/Accountant.java @@ -170,7 +170,7 @@ private AllocationOutcome allocate(final long size, final boolean incomingUpdate } final AllocationOutcome finalOutcome = beyondLimit ? AllocationOutcome.FAILED_LOCAL : - parentOutcome.ok ? AllocationOutcome.SUCCESS : AllocationOutcome.FAILED_PARENT; + parentOutcome.isOk() ? AllocationOutcome.SUCCESS : AllocationOutcome.FAILED_PARENT; if (updatePeak) { updatePeak(); @@ -219,6 +219,15 @@ public long getLimit() { return allocationLimit.get(); } + /** + * Return the initial reservation. + * + * @return reservation in bytes. + */ + public long getInitReservation() { + return reservation; + } + /** * Set the maximum amount of memory that can be allocated in the this Accountant before failing * an allocation. @@ -258,39 +267,4 @@ public long getHeadroom() { return Math.min(localHeadroom, parent.getHeadroom()); } - /** - * Describes the type of outcome that occurred when trying to account for allocation of memory. - */ - public static enum AllocationOutcome { - - /** - * Allocation succeeded. - */ - SUCCESS(true), - - /** - * Allocation succeeded but only because the allocator was forced to move beyond a limit. - */ - FORCED_SUCCESS(true), - - /** - * Allocation failed because the local allocator's limits were exceeded. - */ - FAILED_LOCAL(false), - - /** - * Allocation failed because a parent allocator's limits were exceeded. - */ - FAILED_PARENT(false); - - private final boolean ok; - - AllocationOutcome(boolean ok) { - this.ok = ok; - } - - public boolean isOk() { - return ok; - } - } } diff --git a/java/memory/src/main/java/org/apache/arrow/memory/AllocationListener.java b/java/memory/src/main/java/org/apache/arrow/memory/AllocationListener.java index d36cb37fc2e24..d6a97efb40839 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/AllocationListener.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/AllocationListener.java @@ -21,8 +21,8 @@ /** * An allocation listener being notified for allocation/deallocation *
- * It is expected to be called from multiple threads and as such, - * provider should take care of making the implementation thread-safe + * It might be called from multiple threads if the allocator hierarchy shares a listener, in which + * case, the provider should take care of making the implementation thread-safe. */ public interface AllocationListener { @@ -30,6 +30,19 @@ public interface AllocationListener { @Override public void onAllocation(long size) { } + + @Override + public boolean onFailedAllocation(long size, AllocationOutcome outcome) { + return false; + } + + @Override + public void onChildAdded(BufferAllocator parentAllocator, BufferAllocator childAllocator) { + } + + @Override + public void onChildRemoved(BufferAllocator parentAllocator, BufferAllocator childAllocator) { + } }; /** @@ -39,4 +52,28 @@ public void onAllocation(long size) { */ void onAllocation(long size); + /** + * Called whenever an allocation failed, giving the caller a chance to create some space in the allocator + * (either by freeing some resource, or by changing the limit), and, if successful, allowing the allocator + * to retry the allocation. + * + * @param size the buffer size that was being allocated + * @param outcome the outcome of the failed allocation. Carries information of what failed + * @return true, if the allocation can be retried; false if the allocation should fail + */ + boolean onFailedAllocation(long size, AllocationOutcome outcome); + + /** + * Called immediately after a child allocator was added to the parent allocator + * @param parentAllocator The parent allocator to which a child was added + * @param childAllocator The child allocator that was just added + */ + void onChildAdded(BufferAllocator parentAllocator, BufferAllocator childAllocator); + + /** + * Called immediately after a child allocator was removed from the parent allocator + * @param parentAllocator The parent allocator from which a child was removed + * @param childAllocator The child allocator that was just removed + */ + void onChildRemoved(BufferAllocator parentAllocator, BufferAllocator childAllocator); } diff --git a/java/memory/src/main/java/org/apache/arrow/memory/AllocationOutcome.java b/java/memory/src/main/java/org/apache/arrow/memory/AllocationOutcome.java new file mode 100644 index 0000000000000..9f2daa7aa4d08 --- /dev/null +++ b/java/memory/src/main/java/org/apache/arrow/memory/AllocationOutcome.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *
+ * http://www.apache.org/licenses/LICENSE-2.0 + *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.memory;
+
+/**
+ * Describes the type of outcome that occurred when trying to account for allocation of memory.
+ */
+public enum AllocationOutcome {
+
+ /**
+ * Allocation succeeded.
+ */
+ SUCCESS(true),
+
+ /**
+ * Allocation succeeded but only because the allocator was forced to move beyond a limit.
+ */
+ FORCED_SUCCESS(true),
+
+ /**
+ * Allocation failed because the local allocator's limits were exceeded.
+ */
+ FAILED_LOCAL(false),
+
+ /**
+ * Allocation failed because a parent allocator's limits were exceeded.
+ */
+ FAILED_PARENT(false);
+
+ private final boolean ok;
+
+ AllocationOutcome(boolean ok) {
+ this.ok = ok;
+ }
+
+ public boolean isOk() {
+ return ok;
+ }
+}
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java
index 2f70f737214dd..e7cb80a608857 100644
--- a/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java
+++ b/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java
@@ -56,28 +56,21 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
private final HistoricalLog historicalLog;
private volatile boolean isClosed = false; // the allocator has been closed
+ /**
+ * Initialize an allocator
+ * @param parentAllocator parent allocator. null if defining a root allocator
+ * @param listener listener callback. Must be non-null -- use {@link AllocationListener#NOOP} if no listener
+ * desired
+ * @param name name of this allocator
+ * @param initReservation initial reservation. Cannot be modified after construction
+ * @param maxAllocation limit. Allocations past the limit fail. Can be modified after construction
+ */
protected BaseAllocator(
- final AllocationListener listener,
- final String name,
- final long initReservation,
- final long maxAllocation) throws OutOfMemoryException {
- this(listener, null, name, initReservation, maxAllocation);
- }
-
- protected BaseAllocator(
- final BaseAllocator parentAllocator,
- final String name,
- final long initReservation,
- final long maxAllocation) throws OutOfMemoryException {
- this(parentAllocator.listener, parentAllocator, name, initReservation, maxAllocation);
- }
-
- private BaseAllocator(
- final AllocationListener listener,
- final BaseAllocator parentAllocator,
- final String name,
- final long initReservation,
- final long maxAllocation) throws OutOfMemoryException {
+ final BaseAllocator parentAllocator,
+ final AllocationListener listener,
+ final String name,
+ final long initReservation,
+ final long maxAllocation) throws OutOfMemoryException {
super(parentAllocator, initReservation, maxAllocation);
this.listener = listener;
@@ -246,6 +239,7 @@ private void childClosed(final BaseAllocator childAllocator) {
}
}
}
+ listener.onChildRemoved(this, childAllocator);
}
@Override
@@ -276,7 +270,13 @@ public ArrowBuf buffer(final int initialRequestSize, BufferManager manager) {
: initialRequestSize;
AllocationOutcome outcome = this.allocateBytes(actualRequestSize);
if (!outcome.isOk()) {
- throw new OutOfMemoryException(createErrorMsg(this, actualRequestSize, initialRequestSize));
+ if (listener.onFailedAllocation(actualRequestSize, outcome)) {
+ // Second try, in case the listener can do something about it
+ outcome = this.allocateBytes(actualRequestSize);
+ }
+ if (!outcome.isOk()) {
+ throw new OutOfMemoryException(createErrorMsg(this, actualRequestSize, initialRequestSize));
+ }
}
boolean success = false;
@@ -333,9 +333,18 @@ public BufferAllocator newChildAllocator(
final String name,
final long initReservation,
final long maxAllocation) {
+ return newChildAllocator(name, this.listener, initReservation, maxAllocation);
+ }
+
+ @Override
+ public BufferAllocator newChildAllocator(
+ final String name,
+ final AllocationListener listener,
+ final long initReservation,
+ final long maxAllocation) {
assertOpen();
- final ChildAllocator childAllocator = new ChildAllocator(this, name, initReservation,
+ final ChildAllocator childAllocator = new ChildAllocator(listener, this, name, initReservation,
maxAllocation);
if (DEBUG) {
@@ -345,6 +354,7 @@ public BufferAllocator newChildAllocator(
childAllocator.name);
}
}
+ this.listener.onChildAdded(this, childAllocator);
return childAllocator;
}
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/BufferAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/BufferAllocator.java
index b23a6e4bd8507..11ca38e089d18 100644
--- a/java/memory/src/main/java/org/apache/arrow/memory/BufferAllocator.java
+++ b/java/memory/src/main/java/org/apache/arrow/memory/BufferAllocator.java
@@ -68,6 +68,17 @@ public interface BufferAllocator extends AutoCloseable {
*/
public BufferAllocator newChildAllocator(String name, long initReservation, long maxAllocation);
+ /**
+ * Create a new child allocator.
+ *
+ * @param name the name of the allocator.
+ * @param listener allocation listener for the newly created child
+ * @param initReservation the initial space reservation (obtained from this allocator)
+ * @param maxAllocation maximum amount of space the new allocator can allocate
+ * @return the new allocator, or null if it can't be created
+ */
+ public BufferAllocator newChildAllocator(String name, AllocationListener listener, long initReservation, long maxAllocation);
+
/**
* Close and release all buffers generated from this buffer pool.
*
@@ -91,6 +102,13 @@ public interface BufferAllocator extends AutoCloseable {
*/
public long getLimit();
+ /**
+ * Return the initial reservation.
+ *
+ * @return reservation in bytes.
+ */
+ public long getInitReservation();
+
/**
* Set the maximum amount of memory this allocator is allowed to allocate.
*
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/ChildAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/ChildAllocator.java
index f9a6dc72ece8c..03ec268d35cb1 100644
--- a/java/memory/src/main/java/org/apache/arrow/memory/ChildAllocator.java
+++ b/java/memory/src/main/java/org/apache/arrow/memory/ChildAllocator.java
@@ -32,6 +32,7 @@ class ChildAllocator extends BaseAllocator {
/**
* Constructor.
*
+ * @param listener Allocation listener to be used in this child
* @param parentAllocator parent allocator -- the one creating this child
* @param name the name of this child allocator
* @param initReservation initial amount of space to reserve (obtained from the parent)
@@ -41,11 +42,12 @@ class ChildAllocator extends BaseAllocator {
* allocation policy in force, even less memory may be available
*/
ChildAllocator(
+ AllocationListener listener,
BaseAllocator parentAllocator,
String name,
long initReservation,
long maxAllocation) {
- super(parentAllocator, name, initReservation, maxAllocation);
+ super(parentAllocator, listener, name, initReservation, maxAllocation);
}
diff --git a/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java
index 1dc6bf0c92fa0..161b81a58b5be 100644
--- a/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java
+++ b/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java
@@ -31,7 +31,7 @@ public RootAllocator(final long limit) {
}
public RootAllocator(final AllocationListener listener, final long limit) {
- super(listener, "ROOT", 0, limit);
+ super(null, listener, "ROOT", 0, limit);
}
/**
diff --git a/java/memory/src/test/java/org/apache/arrow/memory/TestAccountant.java b/java/memory/src/test/java/org/apache/arrow/memory/TestAccountant.java
index 100be069fe6d4..a45de487625e8 100644
--- a/java/memory/src/test/java/org/apache/arrow/memory/TestAccountant.java
+++ b/java/memory/src/test/java/org/apache/arrow/memory/TestAccountant.java
@@ -20,7 +20,6 @@
import static org.junit.Assert.assertEquals;
-import org.apache.arrow.memory.Accountant.AllocationOutcome;
import org.junit.Assert;
import org.junit.Test;
diff --git a/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java b/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java
index 76f2c501cf4c7..4ac35d132367b 100644
--- a/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java
+++ b/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java
@@ -222,7 +222,150 @@ public void testRootAllocator_createChildDontClose() throws Exception {
}
}
- private static void allocateAndFree(final BufferAllocator allocator) {
+ // Allocation listener
+ // It counts the number of times it has been invoked, and how much memory allocation it has seen
+ // When set to 'expand on fail', it attempts to expand the associated allocator's limit
+ private static final class TestAllocationListener implements AllocationListener {
+ private int numCalls;
+ private int numChildren;
+ private long totalMem;
+ private boolean expandOnFail;
+ BufferAllocator expandAlloc;
+ long expandLimit;
+
+ TestAllocationListener() {
+ this.numCalls = 0;
+ this.numChildren = 0;
+ this.totalMem = 0;
+ this.expandOnFail = false;
+ this.expandAlloc = null;
+ this.expandLimit = 0;
+ }
+
+ @Override
+ public void onAllocation(long size) {
+ numCalls++;
+ totalMem += size;
+ }
+
+ @Override
+ public boolean onFailedAllocation(long size, AllocationOutcome outcome) {
+ if (expandOnFail) {
+ expandAlloc.setLimit(expandLimit);
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public void onChildAdded(BufferAllocator parentAllocator, BufferAllocator childAllocator) {
+ ++numChildren;
+ }
+
+ @Override
+ public void onChildRemoved(BufferAllocator parentAllocator, BufferAllocator childAllocator) {
+ --numChildren;
+ }
+
+ void setExpandOnFail(BufferAllocator expandAlloc, long expandLimit) {
+ this.expandOnFail = true;
+ this.expandAlloc = expandAlloc;
+ this.expandLimit = expandLimit;
+ }
+
+ int getNumCalls() {
+ return numCalls;
+ }
+
+ int getNumChildren() {
+ return numChildren;
+ }
+
+ long getTotalMem() {
+ return totalMem;
+ }
+ }
+
+ @Test
+ public void testRootAllocator_listeners() throws Exception {
+ TestAllocationListener l1 = new TestAllocationListener();
+ assertEquals(0, l1.getNumCalls());
+ assertEquals(0, l1.getNumChildren());
+ assertEquals(0, l1.getTotalMem());
+ TestAllocationListener l2 = new TestAllocationListener();
+ assertEquals(0, l2.getNumCalls());
+ assertEquals(0, l2.getNumChildren());
+ assertEquals(0, l2.getTotalMem());
+ // root and first-level child share the first listener
+ // second-level and third-level child share the second listener
+ try (final RootAllocator rootAllocator = new RootAllocator(l1, MAX_ALLOCATION)) {
+ try (final BufferAllocator c1 = rootAllocator.newChildAllocator("c1", 0, MAX_ALLOCATION)) {
+ assertEquals(1, l1.getNumChildren());
+ final ArrowBuf buf1 = c1.buffer(16);
+ assertNotNull("allocation failed", buf1);
+ assertEquals(1, l1.getNumCalls());
+ assertEquals(16, l1.getTotalMem());
+ buf1.release();
+ try (final BufferAllocator c2 = c1.newChildAllocator("c2", l2, 0, MAX_ALLOCATION)) {
+ assertEquals(2, l1.getNumChildren()); // c1 got a new child, so c1's listener (l1) is notified
+ assertEquals(0, l2.getNumChildren());
+ final ArrowBuf buf2 = c2.buffer(32);
+ assertNotNull("allocation failed", buf2);
+ assertEquals(1, l1.getNumCalls());
+ assertEquals(16, l1.getTotalMem());
+ assertEquals(1, l2.getNumCalls());
+ assertEquals(32, l2.getTotalMem());
+ buf2.release();
+ try (final BufferAllocator c3 = c2.newChildAllocator("c3", 0, MAX_ALLOCATION)) {
+ assertEquals(2, l1.getNumChildren());
+ assertEquals(1, l2.getNumChildren());
+ final ArrowBuf buf3 = c3.buffer(64);
+ assertNotNull("allocation failed", buf3);
+ assertEquals(1, l1.getNumCalls());
+ assertEquals(16, l1.getTotalMem());
+ assertEquals(2, l2.getNumCalls());
+ assertEquals(32 + 64, l2.getTotalMem());
+ buf3.release();
+ }
+ assertEquals(2, l1.getNumChildren());
+ assertEquals(0, l2.getNumChildren()); // third-level child removed
+ }
+ assertEquals(1, l1.getNumChildren()); // second-level child removed
+ assertEquals(0, l2.getNumChildren());
+ }
+ assertEquals(0, l1.getNumChildren()); // first-level child removed
+ }
+ }
+
+ @Test
+ public void testRootAllocator_listenerAllocationFail() throws Exception {
+ TestAllocationListener l1 = new TestAllocationListener();
+ assertEquals(0, l1.getNumCalls());
+ assertEquals(0, l1.getTotalMem());
+ // Test attempts to allocate too much from a child whose limit is set to half of the max allocation
+ // The listener's callback triggers, expanding the child allocator's limit, so then the allocation succeeds
+ try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) {
+ try (final BufferAllocator c1 = rootAllocator.newChildAllocator("c1", l1,0, MAX_ALLOCATION / 2)) {
+ try {
+ c1.buffer(MAX_ALLOCATION);
+ fail("allocated memory beyond max allowed");
+ } catch (OutOfMemoryException e) {
+ // expected
+ }
+ assertEquals(0, l1.getNumCalls());
+ assertEquals(0, l1.getTotalMem());
+
+ l1.setExpandOnFail(c1, MAX_ALLOCATION);
+ ArrowBuf arrowBuf = c1.buffer(MAX_ALLOCATION);
+ assertNotNull("allocation failed", arrowBuf);
+ assertEquals(1, l1.getNumCalls());
+ assertEquals(MAX_ALLOCATION, l1.getTotalMem());
+ arrowBuf.release();
+ }
+ }
+ }
+
+ private static void allocateAndFree(final BufferAllocator allocator) {
final ArrowBuf arrowBuf = allocator.buffer(512);
assertNotNull("allocation failed", arrowBuf);
arrowBuf.release();
diff --git a/java/vector/src/main/codegen/templates/UnionVector.java b/java/vector/src/main/codegen/templates/UnionVector.java
index 1cfa0666a3139..8b27f39bfb215 100644
--- a/java/vector/src/main/codegen/templates/UnionVector.java
+++ b/java/vector/src/main/codegen/templates/UnionVector.java
@@ -290,8 +290,7 @@ private void reallocTypeBuffer() {
final ArrowBuf newBuf = allocator.buffer((int)newAllocationSize);
newBuf.setBytes(0, typeBuffer, 0, currentBufferCapacity);
- final int halfNewCapacity = newBuf.capacity() / 2;
- newBuf.setZero(halfNewCapacity, halfNewCapacity);
+ newBuf.setZero(currentBufferCapacity, newBuf.capacity() - currentBufferCapacity);
typeBuffer.release(1);
typeBuffer = newBuf;
typeBufferAllocationSizeInBytes = (int)newAllocationSize;
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/BaseFixedWidthVector.java b/java/vector/src/main/java/org/apache/arrow/vector/BaseFixedWidthVector.java
index 4b47df8a450a0..5c456368c4961 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/BaseFixedWidthVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/BaseFixedWidthVector.java
@@ -281,7 +281,6 @@ public boolean allocateNewSafe() {
try {
allocateBytes(curAllocationSizeValue, curAllocationSizeValidity);
} catch (Exception e) {
- e.printStackTrace();
clear();
return false;
}
@@ -314,7 +313,6 @@ public void allocateNew(int valueCount) {
try {
allocateBytes(valueBufferSize, validityBufferSize);
} catch (Exception e) {
- e.printStackTrace();
clear();
throw e;
}
@@ -452,8 +450,7 @@ private ArrowBuf reallocBufferHelper(ArrowBuf buffer, final boolean dataBuffer)
final ArrowBuf newBuf = allocator.buffer((int) newAllocationSize);
newBuf.setBytes(0, buffer, 0, currentBufferCapacity);
- final int halfNewCapacity = newBuf.capacity() / 2;
- newBuf.setZero(halfNewCapacity, halfNewCapacity);
+ newBuf.setZero(currentBufferCapacity, newBuf.capacity() - currentBufferCapacity);
buffer.release(1);
buffer = newBuf;
if (dataBuffer) {
@@ -841,4 +838,4 @@ protected void handleSafe(int index) {
reAlloc();
}
}
-}
\ No newline at end of file
+}
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/BaseVariableWidthVector.java b/java/vector/src/main/java/org/apache/arrow/vector/BaseVariableWidthVector.java
index ecb3c780efca4..3ebbf81fba160 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/BaseVariableWidthVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/BaseVariableWidthVector.java
@@ -394,7 +394,6 @@ public boolean allocateNewSafe() {
try {
allocateBytes(curAllocationSizeValue, curAllocationSizeValidity, curAllocationSizeOffset);
} catch (Exception e) {
- e.printStackTrace();
clear();
return false;
}
@@ -427,8 +426,8 @@ public void allocateNew(int totalBytes, int valueCount) {
try {
allocateBytes(totalBytes, validityBufferSize, offsetBufferSize);
} catch (Exception e) {
- e.printStackTrace();
clear();
+ throw e;
}
}
@@ -550,8 +549,7 @@ private ArrowBuf reallocBufferHelper(ArrowBuf buffer, final boolean offsetBuffer
final ArrowBuf newBuf = allocator.buffer((int) newAllocationSize);
newBuf.setBytes(0, buffer, 0, currentBufferCapacity);
- final int halfNewCapacity = newBuf.capacity() / 2;
- newBuf.setZero(halfNewCapacity, halfNewCapacity);
+ newBuf.setZero(currentBufferCapacity, newBuf.capacity() - currentBufferCapacity);
buffer.release(1);
buffer = newBuf;
if (offsetBuffer) {
@@ -711,9 +709,7 @@ public void splitAndTransferTo(int startIndex, int length,
splitAndTransferValidityBuffer(startIndex, length, target);
splitAndTransferOffsetBuffer(startIndex, length, target);
target.setLastSet(length - 1);
- if (this.valueCount > 0) {
- target.setValueCount(this.valueCount);
- }
+ target.setValueCount(length);
}
/*
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/DecimalVector.java b/java/vector/src/main/java/org/apache/arrow/vector/DecimalVector.java
index a043575081fb3..db1ad0b31b3a8 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/DecimalVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/DecimalVector.java
@@ -18,6 +18,7 @@
package org.apache.arrow.vector;
+import com.google.common.base.Preconditions;
import io.netty.buffer.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.complex.impl.DecimalReaderImpl;
@@ -211,7 +212,6 @@ public void set(int index, ArrowBuf buffer) {
* @param value array of bytes containing decimal in big endian byte order.
*/
public void setBigEndian(int index, byte[] value) {
- assert value.length <= TYPE_WIDTH;
BitVectorHelper.setValidityBitToOne(validityBuffer, index);
final int length = value.length;
int startIndex = index * TYPE_WIDTH;
@@ -223,13 +223,32 @@ public void setBigEndian(int index, byte[] value) {
valueBuffer.setByte(startIndex + 3, value[i-3]);
startIndex += 4;
}
- } else {
+
+ return;
+ }
+
+ if (length == 0) {
+ valueBuffer.setZero(startIndex, TYPE_WIDTH);
+ return;
+ }
+
+ if (length < 16) {
for (int i = length - 1; i >= 0; i--) {
valueBuffer.setByte(startIndex, value[i]);
startIndex++;
}
- valueBuffer.setZero(startIndex, TYPE_WIDTH - length);
+
+ final byte pad = (byte) (value[0] < 0 ? 0xFF : 0x00);
+ final int maxStartIndex = (index + 1) * TYPE_WIDTH;
+ while (startIndex < maxStartIndex) {
+ valueBuffer.setByte(startIndex, pad);
+ startIndex++;
+ }
+
+ return;
}
+
+ throw new IllegalArgumentException("Invalid decimal value length. Valid length in [1 - 16], got " + length);
}
/**
@@ -467,4 +486,4 @@ public void copyValueSafe(int fromIndex, int toIndex) {
to.copyFromSafe(fromIndex, toIndex, DecimalVector.this);
}
}
-}
\ No newline at end of file
+}
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java
index 2dd2894ffebf9..bd4f7aac7b117 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java
@@ -117,8 +117,7 @@ protected void reallocOffsetBuffer() {
final ArrowBuf newBuf = allocator.buffer((int) newAllocationSize);
newBuf.setBytes(0, offsetBuffer, 0, currentBufferCapacity);
- final int halfNewCapacity = newBuf.capacity() / 2;
- newBuf.setZero(halfNewCapacity, halfNewCapacity);
+ newBuf.setZero(currentBufferCapacity, newBuf.capacity() - currentBufferCapacity);
offsetBuffer.release(1);
offsetBuffer = newBuf;
offsetAllocationSizeInBytes = (int) newAllocationSize;
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/FixedSizeListVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/FixedSizeListVector.java
index eadbab43619a6..f863bb6791856 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/complex/FixedSizeListVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/FixedSizeListVector.java
@@ -217,8 +217,8 @@ private void reallocValidityBuffer() {
}
final ArrowBuf newBuf = allocator.buffer((int) newAllocationSize);
- newBuf.setZero(0, newBuf.capacity());
newBuf.setBytes(0, validityBuffer, 0, currentBufferCapacity);
+ newBuf.setZero(currentBufferCapacity, newBuf.capacity() - currentBufferCapacity);
validityBuffer.release(1);
validityBuffer = newBuf;
validityAllocationSizeInBytes = (int) newAllocationSize;
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java
index d3eeaf2f0df7a..9666d800e4fe2 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java
@@ -31,12 +31,7 @@
import org.apache.arrow.memory.BaseAllocator;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.OutOfMemoryException;
-import org.apache.arrow.vector.AddOrGetResult;
-import org.apache.arrow.vector.BufferBacked;
-import org.apache.arrow.vector.FieldVector;
-import org.apache.arrow.vector.ValueVector;
-import org.apache.arrow.vector.ZeroVector;
-import org.apache.arrow.vector.BitVectorHelper;
+import org.apache.arrow.vector.*;
import org.apache.arrow.vector.complex.impl.ComplexCopier;
import org.apache.arrow.vector.complex.impl.UnionListReader;
import org.apache.arrow.vector.complex.impl.UnionListWriter;
@@ -301,8 +296,7 @@ private void reallocValidityBuffer() {
final ArrowBuf newBuf = allocator.buffer((int) newAllocationSize);
newBuf.setBytes(0, validityBuffer, 0, currentBufferCapacity);
- final int halfNewCapacity = newBuf.capacity() / 2;
- newBuf.setZero(halfNewCapacity, halfNewCapacity);
+ newBuf.setZero(currentBufferCapacity, newBuf.capacity() - currentBufferCapacity);
validityBuffer.release(1);
validityBuffer = newBuf;
validityAllocationSizeInBytes = (int) newAllocationSize;
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/StructVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/StructVector.java
index 05571bbb931c6..a90902fc86187 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/complex/StructVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/StructVector.java
@@ -130,7 +130,7 @@ public TransferPair getTransferPair(BufferAllocator allocator) {
@Override
public TransferPair makeTransferPair(ValueVector to) {
- return new NullableStructTransferPair(this, (StructVector) to, true);
+ return new NullableStructTransferPair(this, (StructVector) to, false);
}
@Override
@@ -414,8 +414,8 @@ private void reallocValidityBuffer() {
}
final ArrowBuf newBuf = allocator.buffer((int) newAllocationSize);
- newBuf.setZero(0, newBuf.capacity());
newBuf.setBytes(0, validityBuffer, 0, currentBufferCapacity);
+ newBuf.setZero(currentBufferCapacity, newBuf.capacity() - currentBufferCapacity);
validityBuffer.release(1);
validityBuffer = newBuf;
validityAllocationSizeInBytes = (int) newAllocationSize;
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestDecimalVector.java b/java/vector/src/test/java/org/apache/arrow/vector/TestDecimalVector.java
index 8c86452fcc3bf..15c56ae2bc382 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/TestDecimalVector.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/TestDecimalVector.java
@@ -20,6 +20,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import java.math.BigDecimal;
import java.math.BigInteger;
@@ -190,4 +191,57 @@ public void testBigDecimalReadWrite() {
assertEquals(decimal8, decimalVector.getObject(7));
}
}
+
+ /**
+ * Test {@link DecimalVector#setBigEndian(int, byte[])} which takes BE layout input and stores in LE layout.
+ * Cases to cover: value given in byte array in different lengths in range [1-16] and negative values.
+ */
+ @Test
+ public void decimalBE2LE() {
+ try (DecimalVector decimalVector = TestUtils.newVector(DecimalVector.class, "decimal", new ArrowType.Decimal(21, 2), allocator);) {
+ decimalVector.allocateNew();
+
+ BigInteger[] testBigInts = new BigInteger[] {
+ new BigInteger("0"),
+ new BigInteger("-1"),
+ new BigInteger("23"),
+ new BigInteger("234234"),
+ new BigInteger("-234234234"),
+ new BigInteger("234234234234"),
+ new BigInteger("-56345345345345"),
+ new BigInteger("29823462983462893462934679234653456345"), // converts to 16 byte array
+ new BigInteger("-3894572983475982374598324598234346536"), // converts to 16 byte array
+ new BigInteger("-345345"),
+ new BigInteger("754533")
+ };
+
+ int insertionIdx = 0;
+ insertionIdx++; // insert a null
+ for (BigInteger val : testBigInts) {
+ decimalVector.setBigEndian(insertionIdx++, val.toByteArray());
+ }
+ insertionIdx++; // insert a null
+ // insert a zero length buffer
+ decimalVector.setBigEndian(insertionIdx++, new byte[0]);
+
+ // Try inserting a buffer larger than 16bytes and expect a failure
+ try {
+ decimalVector.setBigEndian(insertionIdx, new byte[17]);
+ fail("above statement should have failed");
+ } catch (IllegalArgumentException ex) {
+ assertTrue(ex.getMessage().equals("Invalid decimal value length. Valid length in [1 - 16], got 17"));
+ }
+ decimalVector.setValueCount(insertionIdx);
+
+ // retrieve values and check if they are correct
+ int outputIdx = 0;
+ assertTrue(decimalVector.isNull(outputIdx++));
+ for (BigInteger expected : testBigInts) {
+ final BigDecimal actual = decimalVector.getObject(outputIdx++);
+ assertEquals(expected, actual.unscaledValue());
+ }
+ assertTrue(decimalVector.isNull(outputIdx++));
+ assertEquals(BigInteger.valueOf(0), decimalVector.getObject(outputIdx).unscaledValue());
+ }
+ }
}
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestListVector.java b/java/vector/src/test/java/org/apache/arrow/vector/TestListVector.java
index d12586ecc59d1..5150273aab97d 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/TestListVector.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/TestListVector.java
@@ -828,4 +828,49 @@ public void testSetInitialCapacity() {
assertEquals(1, vector.getDataVector().getValueCapacity());
}
}
+
+ @Test
+ public void testClearAndReuse() {
+ try (final ListVector vector = ListVector.empty("list", allocator)) {
+ BigIntVector bigIntVector = (BigIntVector) vector.addOrGetVector(FieldType.nullable(MinorType.BIGINT.getType())).getVector();
+ vector.setInitialCapacity(10);
+ vector.allocateNew();
+
+ vector.startNewValue(0);
+ bigIntVector.setSafe(0, 7);
+ vector.endValue(0, 1);
+ vector.startNewValue(1);
+ bigIntVector.setSafe(1, 8);
+ vector.endValue(1, 1);
+ vector.setValueCount(2);
+
+ Object result = vector.getObject(0);
+ ArrayList