diff --git a/java/memory/src/main/java/org/apache/arrow/memory/Accountant.java b/java/memory/src/main/java/org/apache/arrow/memory/Accountant.java index 5bd6b9fe37956..e3ef2cca8144b 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/Accountant.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/Accountant.java @@ -170,7 +170,7 @@ private AllocationOutcome allocate(final long size, final boolean incomingUpdate } final AllocationOutcome finalOutcome = beyondLimit ? AllocationOutcome.FAILED_LOCAL : - parentOutcome.ok ? AllocationOutcome.SUCCESS : AllocationOutcome.FAILED_PARENT; + parentOutcome.isOk() ? AllocationOutcome.SUCCESS : AllocationOutcome.FAILED_PARENT; if (updatePeak) { updatePeak(); @@ -219,6 +219,15 @@ public long getLimit() { return allocationLimit.get(); } + /** + * Return the initial reservation. + * + * @return reservation in bytes. + */ + public long getInitReservation() { + return reservation; + } + /** * Set the maximum amount of memory that can be allocated in the this Accountant before failing * an allocation. @@ -258,39 +267,4 @@ public long getHeadroom() { return Math.min(localHeadroom, parent.getHeadroom()); } - /** - * Describes the type of outcome that occurred when trying to account for allocation of memory. - */ - public static enum AllocationOutcome { - - /** - * Allocation succeeded. - */ - SUCCESS(true), - - /** - * Allocation succeeded but only because the allocator was forced to move beyond a limit. - */ - FORCED_SUCCESS(true), - - /** - * Allocation failed because the local allocator's limits were exceeded. - */ - FAILED_LOCAL(false), - - /** - * Allocation failed because a parent allocator's limits were exceeded. - */ - FAILED_PARENT(false); - - private final boolean ok; - - AllocationOutcome(boolean ok) { - this.ok = ok; - } - - public boolean isOk() { - return ok; - } - } } diff --git a/java/memory/src/main/java/org/apache/arrow/memory/AllocationListener.java b/java/memory/src/main/java/org/apache/arrow/memory/AllocationListener.java index d36cb37fc2e24..d6a97efb40839 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/AllocationListener.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/AllocationListener.java @@ -21,8 +21,8 @@ /** * An allocation listener being notified for allocation/deallocation *

- * It is expected to be called from multiple threads and as such, - * provider should take care of making the implementation thread-safe + * It might be called from multiple threads if the allocator hierarchy shares a listener, in which + * case, the provider should take care of making the implementation thread-safe. */ public interface AllocationListener { @@ -30,6 +30,19 @@ public interface AllocationListener { @Override public void onAllocation(long size) { } + + @Override + public boolean onFailedAllocation(long size, AllocationOutcome outcome) { + return false; + } + + @Override + public void onChildAdded(BufferAllocator parentAllocator, BufferAllocator childAllocator) { + } + + @Override + public void onChildRemoved(BufferAllocator parentAllocator, BufferAllocator childAllocator) { + } }; /** @@ -39,4 +52,28 @@ public void onAllocation(long size) { */ void onAllocation(long size); + /** + * Called whenever an allocation failed, giving the caller a chance to create some space in the allocator + * (either by freeing some resource, or by changing the limit), and, if successful, allowing the allocator + * to retry the allocation. + * + * @param size the buffer size that was being allocated + * @param outcome the outcome of the failed allocation. Carries information of what failed + * @return true, if the allocation can be retried; false if the allocation should fail + */ + boolean onFailedAllocation(long size, AllocationOutcome outcome); + + /** + * Called immediately after a child allocator was added to the parent allocator + * @param parentAllocator The parent allocator to which a child was added + * @param childAllocator The child allocator that was just added + */ + void onChildAdded(BufferAllocator parentAllocator, BufferAllocator childAllocator); + + /** + * Called immediately after a child allocator was removed from the parent allocator + * @param parentAllocator The parent allocator from which a child was removed + * @param childAllocator The child allocator that was just removed + */ + void onChildRemoved(BufferAllocator parentAllocator, BufferAllocator childAllocator); } diff --git a/java/memory/src/main/java/org/apache/arrow/memory/AllocationOutcome.java b/java/memory/src/main/java/org/apache/arrow/memory/AllocationOutcome.java new file mode 100644 index 0000000000000..9f2daa7aa4d08 --- /dev/null +++ b/java/memory/src/main/java/org/apache/arrow/memory/AllocationOutcome.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.memory; + +/** + * Describes the type of outcome that occurred when trying to account for allocation of memory. + */ +public enum AllocationOutcome { + + /** + * Allocation succeeded. + */ + SUCCESS(true), + + /** + * Allocation succeeded but only because the allocator was forced to move beyond a limit. + */ + FORCED_SUCCESS(true), + + /** + * Allocation failed because the local allocator's limits were exceeded. + */ + FAILED_LOCAL(false), + + /** + * Allocation failed because a parent allocator's limits were exceeded. + */ + FAILED_PARENT(false); + + private final boolean ok; + + AllocationOutcome(boolean ok) { + this.ok = ok; + } + + public boolean isOk() { + return ok; + } +} diff --git a/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java index 2f70f737214dd..e7cb80a608857 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java @@ -56,28 +56,21 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato private final HistoricalLog historicalLog; private volatile boolean isClosed = false; // the allocator has been closed + /** + * Initialize an allocator + * @param parentAllocator parent allocator. null if defining a root allocator + * @param listener listener callback. Must be non-null -- use {@link AllocationListener#NOOP} if no listener + * desired + * @param name name of this allocator + * @param initReservation initial reservation. Cannot be modified after construction + * @param maxAllocation limit. Allocations past the limit fail. Can be modified after construction + */ protected BaseAllocator( - final AllocationListener listener, - final String name, - final long initReservation, - final long maxAllocation) throws OutOfMemoryException { - this(listener, null, name, initReservation, maxAllocation); - } - - protected BaseAllocator( - final BaseAllocator parentAllocator, - final String name, - final long initReservation, - final long maxAllocation) throws OutOfMemoryException { - this(parentAllocator.listener, parentAllocator, name, initReservation, maxAllocation); - } - - private BaseAllocator( - final AllocationListener listener, - final BaseAllocator parentAllocator, - final String name, - final long initReservation, - final long maxAllocation) throws OutOfMemoryException { + final BaseAllocator parentAllocator, + final AllocationListener listener, + final String name, + final long initReservation, + final long maxAllocation) throws OutOfMemoryException { super(parentAllocator, initReservation, maxAllocation); this.listener = listener; @@ -246,6 +239,7 @@ private void childClosed(final BaseAllocator childAllocator) { } } } + listener.onChildRemoved(this, childAllocator); } @Override @@ -276,7 +270,13 @@ public ArrowBuf buffer(final int initialRequestSize, BufferManager manager) { : initialRequestSize; AllocationOutcome outcome = this.allocateBytes(actualRequestSize); if (!outcome.isOk()) { - throw new OutOfMemoryException(createErrorMsg(this, actualRequestSize, initialRequestSize)); + if (listener.onFailedAllocation(actualRequestSize, outcome)) { + // Second try, in case the listener can do something about it + outcome = this.allocateBytes(actualRequestSize); + } + if (!outcome.isOk()) { + throw new OutOfMemoryException(createErrorMsg(this, actualRequestSize, initialRequestSize)); + } } boolean success = false; @@ -333,9 +333,18 @@ public BufferAllocator newChildAllocator( final String name, final long initReservation, final long maxAllocation) { + return newChildAllocator(name, this.listener, initReservation, maxAllocation); + } + + @Override + public BufferAllocator newChildAllocator( + final String name, + final AllocationListener listener, + final long initReservation, + final long maxAllocation) { assertOpen(); - final ChildAllocator childAllocator = new ChildAllocator(this, name, initReservation, + final ChildAllocator childAllocator = new ChildAllocator(listener, this, name, initReservation, maxAllocation); if (DEBUG) { @@ -345,6 +354,7 @@ public BufferAllocator newChildAllocator( childAllocator.name); } } + this.listener.onChildAdded(this, childAllocator); return childAllocator; } diff --git a/java/memory/src/main/java/org/apache/arrow/memory/BufferAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/BufferAllocator.java index b23a6e4bd8507..11ca38e089d18 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/BufferAllocator.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/BufferAllocator.java @@ -68,6 +68,17 @@ public interface BufferAllocator extends AutoCloseable { */ public BufferAllocator newChildAllocator(String name, long initReservation, long maxAllocation); + /** + * Create a new child allocator. + * + * @param name the name of the allocator. + * @param listener allocation listener for the newly created child + * @param initReservation the initial space reservation (obtained from this allocator) + * @param maxAllocation maximum amount of space the new allocator can allocate + * @return the new allocator, or null if it can't be created + */ + public BufferAllocator newChildAllocator(String name, AllocationListener listener, long initReservation, long maxAllocation); + /** * Close and release all buffers generated from this buffer pool. * @@ -91,6 +102,13 @@ public interface BufferAllocator extends AutoCloseable { */ public long getLimit(); + /** + * Return the initial reservation. + * + * @return reservation in bytes. + */ + public long getInitReservation(); + /** * Set the maximum amount of memory this allocator is allowed to allocate. * diff --git a/java/memory/src/main/java/org/apache/arrow/memory/ChildAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/ChildAllocator.java index f9a6dc72ece8c..03ec268d35cb1 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/ChildAllocator.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/ChildAllocator.java @@ -32,6 +32,7 @@ class ChildAllocator extends BaseAllocator { /** * Constructor. * + * @param listener Allocation listener to be used in this child * @param parentAllocator parent allocator -- the one creating this child * @param name the name of this child allocator * @param initReservation initial amount of space to reserve (obtained from the parent) @@ -41,11 +42,12 @@ class ChildAllocator extends BaseAllocator { * allocation policy in force, even less memory may be available */ ChildAllocator( + AllocationListener listener, BaseAllocator parentAllocator, String name, long initReservation, long maxAllocation) { - super(parentAllocator, name, initReservation, maxAllocation); + super(parentAllocator, listener, name, initReservation, maxAllocation); } diff --git a/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java index 1dc6bf0c92fa0..161b81a58b5be 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java @@ -31,7 +31,7 @@ public RootAllocator(final long limit) { } public RootAllocator(final AllocationListener listener, final long limit) { - super(listener, "ROOT", 0, limit); + super(null, listener, "ROOT", 0, limit); } /** diff --git a/java/memory/src/test/java/org/apache/arrow/memory/TestAccountant.java b/java/memory/src/test/java/org/apache/arrow/memory/TestAccountant.java index 100be069fe6d4..a45de487625e8 100644 --- a/java/memory/src/test/java/org/apache/arrow/memory/TestAccountant.java +++ b/java/memory/src/test/java/org/apache/arrow/memory/TestAccountant.java @@ -20,7 +20,6 @@ import static org.junit.Assert.assertEquals; -import org.apache.arrow.memory.Accountant.AllocationOutcome; import org.junit.Assert; import org.junit.Test; diff --git a/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java b/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java index 76f2c501cf4c7..4ac35d132367b 100644 --- a/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java +++ b/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java @@ -222,7 +222,150 @@ public void testRootAllocator_createChildDontClose() throws Exception { } } - private static void allocateAndFree(final BufferAllocator allocator) { + // Allocation listener + // It counts the number of times it has been invoked, and how much memory allocation it has seen + // When set to 'expand on fail', it attempts to expand the associated allocator's limit + private static final class TestAllocationListener implements AllocationListener { + private int numCalls; + private int numChildren; + private long totalMem; + private boolean expandOnFail; + BufferAllocator expandAlloc; + long expandLimit; + + TestAllocationListener() { + this.numCalls = 0; + this.numChildren = 0; + this.totalMem = 0; + this.expandOnFail = false; + this.expandAlloc = null; + this.expandLimit = 0; + } + + @Override + public void onAllocation(long size) { + numCalls++; + totalMem += size; + } + + @Override + public boolean onFailedAllocation(long size, AllocationOutcome outcome) { + if (expandOnFail) { + expandAlloc.setLimit(expandLimit); + return true; + } + return false; + } + + @Override + public void onChildAdded(BufferAllocator parentAllocator, BufferAllocator childAllocator) { + ++numChildren; + } + + @Override + public void onChildRemoved(BufferAllocator parentAllocator, BufferAllocator childAllocator) { + --numChildren; + } + + void setExpandOnFail(BufferAllocator expandAlloc, long expandLimit) { + this.expandOnFail = true; + this.expandAlloc = expandAlloc; + this.expandLimit = expandLimit; + } + + int getNumCalls() { + return numCalls; + } + + int getNumChildren() { + return numChildren; + } + + long getTotalMem() { + return totalMem; + } + } + + @Test + public void testRootAllocator_listeners() throws Exception { + TestAllocationListener l1 = new TestAllocationListener(); + assertEquals(0, l1.getNumCalls()); + assertEquals(0, l1.getNumChildren()); + assertEquals(0, l1.getTotalMem()); + TestAllocationListener l2 = new TestAllocationListener(); + assertEquals(0, l2.getNumCalls()); + assertEquals(0, l2.getNumChildren()); + assertEquals(0, l2.getTotalMem()); + // root and first-level child share the first listener + // second-level and third-level child share the second listener + try (final RootAllocator rootAllocator = new RootAllocator(l1, MAX_ALLOCATION)) { + try (final BufferAllocator c1 = rootAllocator.newChildAllocator("c1", 0, MAX_ALLOCATION)) { + assertEquals(1, l1.getNumChildren()); + final ArrowBuf buf1 = c1.buffer(16); + assertNotNull("allocation failed", buf1); + assertEquals(1, l1.getNumCalls()); + assertEquals(16, l1.getTotalMem()); + buf1.release(); + try (final BufferAllocator c2 = c1.newChildAllocator("c2", l2, 0, MAX_ALLOCATION)) { + assertEquals(2, l1.getNumChildren()); // c1 got a new child, so c1's listener (l1) is notified + assertEquals(0, l2.getNumChildren()); + final ArrowBuf buf2 = c2.buffer(32); + assertNotNull("allocation failed", buf2); + assertEquals(1, l1.getNumCalls()); + assertEquals(16, l1.getTotalMem()); + assertEquals(1, l2.getNumCalls()); + assertEquals(32, l2.getTotalMem()); + buf2.release(); + try (final BufferAllocator c3 = c2.newChildAllocator("c3", 0, MAX_ALLOCATION)) { + assertEquals(2, l1.getNumChildren()); + assertEquals(1, l2.getNumChildren()); + final ArrowBuf buf3 = c3.buffer(64); + assertNotNull("allocation failed", buf3); + assertEquals(1, l1.getNumCalls()); + assertEquals(16, l1.getTotalMem()); + assertEquals(2, l2.getNumCalls()); + assertEquals(32 + 64, l2.getTotalMem()); + buf3.release(); + } + assertEquals(2, l1.getNumChildren()); + assertEquals(0, l2.getNumChildren()); // third-level child removed + } + assertEquals(1, l1.getNumChildren()); // second-level child removed + assertEquals(0, l2.getNumChildren()); + } + assertEquals(0, l1.getNumChildren()); // first-level child removed + } + } + + @Test + public void testRootAllocator_listenerAllocationFail() throws Exception { + TestAllocationListener l1 = new TestAllocationListener(); + assertEquals(0, l1.getNumCalls()); + assertEquals(0, l1.getTotalMem()); + // Test attempts to allocate too much from a child whose limit is set to half of the max allocation + // The listener's callback triggers, expanding the child allocator's limit, so then the allocation succeeds + try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) { + try (final BufferAllocator c1 = rootAllocator.newChildAllocator("c1", l1,0, MAX_ALLOCATION / 2)) { + try { + c1.buffer(MAX_ALLOCATION); + fail("allocated memory beyond max allowed"); + } catch (OutOfMemoryException e) { + // expected + } + assertEquals(0, l1.getNumCalls()); + assertEquals(0, l1.getTotalMem()); + + l1.setExpandOnFail(c1, MAX_ALLOCATION); + ArrowBuf arrowBuf = c1.buffer(MAX_ALLOCATION); + assertNotNull("allocation failed", arrowBuf); + assertEquals(1, l1.getNumCalls()); + assertEquals(MAX_ALLOCATION, l1.getTotalMem()); + arrowBuf.release(); + } + } + } + + private static void allocateAndFree(final BufferAllocator allocator) { final ArrowBuf arrowBuf = allocator.buffer(512); assertNotNull("allocation failed", arrowBuf); arrowBuf.release(); diff --git a/java/vector/src/main/codegen/templates/UnionVector.java b/java/vector/src/main/codegen/templates/UnionVector.java index 1cfa0666a3139..8b27f39bfb215 100644 --- a/java/vector/src/main/codegen/templates/UnionVector.java +++ b/java/vector/src/main/codegen/templates/UnionVector.java @@ -290,8 +290,7 @@ private void reallocTypeBuffer() { final ArrowBuf newBuf = allocator.buffer((int)newAllocationSize); newBuf.setBytes(0, typeBuffer, 0, currentBufferCapacity); - final int halfNewCapacity = newBuf.capacity() / 2; - newBuf.setZero(halfNewCapacity, halfNewCapacity); + newBuf.setZero(currentBufferCapacity, newBuf.capacity() - currentBufferCapacity); typeBuffer.release(1); typeBuffer = newBuf; typeBufferAllocationSizeInBytes = (int)newAllocationSize; diff --git a/java/vector/src/main/java/org/apache/arrow/vector/BaseFixedWidthVector.java b/java/vector/src/main/java/org/apache/arrow/vector/BaseFixedWidthVector.java index 4b47df8a450a0..5c456368c4961 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/BaseFixedWidthVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/BaseFixedWidthVector.java @@ -281,7 +281,6 @@ public boolean allocateNewSafe() { try { allocateBytes(curAllocationSizeValue, curAllocationSizeValidity); } catch (Exception e) { - e.printStackTrace(); clear(); return false; } @@ -314,7 +313,6 @@ public void allocateNew(int valueCount) { try { allocateBytes(valueBufferSize, validityBufferSize); } catch (Exception e) { - e.printStackTrace(); clear(); throw e; } @@ -452,8 +450,7 @@ private ArrowBuf reallocBufferHelper(ArrowBuf buffer, final boolean dataBuffer) final ArrowBuf newBuf = allocator.buffer((int) newAllocationSize); newBuf.setBytes(0, buffer, 0, currentBufferCapacity); - final int halfNewCapacity = newBuf.capacity() / 2; - newBuf.setZero(halfNewCapacity, halfNewCapacity); + newBuf.setZero(currentBufferCapacity, newBuf.capacity() - currentBufferCapacity); buffer.release(1); buffer = newBuf; if (dataBuffer) { @@ -841,4 +838,4 @@ protected void handleSafe(int index) { reAlloc(); } } -} \ No newline at end of file +} diff --git a/java/vector/src/main/java/org/apache/arrow/vector/BaseVariableWidthVector.java b/java/vector/src/main/java/org/apache/arrow/vector/BaseVariableWidthVector.java index ecb3c780efca4..3ebbf81fba160 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/BaseVariableWidthVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/BaseVariableWidthVector.java @@ -394,7 +394,6 @@ public boolean allocateNewSafe() { try { allocateBytes(curAllocationSizeValue, curAllocationSizeValidity, curAllocationSizeOffset); } catch (Exception e) { - e.printStackTrace(); clear(); return false; } @@ -427,8 +426,8 @@ public void allocateNew(int totalBytes, int valueCount) { try { allocateBytes(totalBytes, validityBufferSize, offsetBufferSize); } catch (Exception e) { - e.printStackTrace(); clear(); + throw e; } } @@ -550,8 +549,7 @@ private ArrowBuf reallocBufferHelper(ArrowBuf buffer, final boolean offsetBuffer final ArrowBuf newBuf = allocator.buffer((int) newAllocationSize); newBuf.setBytes(0, buffer, 0, currentBufferCapacity); - final int halfNewCapacity = newBuf.capacity() / 2; - newBuf.setZero(halfNewCapacity, halfNewCapacity); + newBuf.setZero(currentBufferCapacity, newBuf.capacity() - currentBufferCapacity); buffer.release(1); buffer = newBuf; if (offsetBuffer) { @@ -711,9 +709,7 @@ public void splitAndTransferTo(int startIndex, int length, splitAndTransferValidityBuffer(startIndex, length, target); splitAndTransferOffsetBuffer(startIndex, length, target); target.setLastSet(length - 1); - if (this.valueCount > 0) { - target.setValueCount(this.valueCount); - } + target.setValueCount(length); } /* diff --git a/java/vector/src/main/java/org/apache/arrow/vector/DecimalVector.java b/java/vector/src/main/java/org/apache/arrow/vector/DecimalVector.java index a043575081fb3..db1ad0b31b3a8 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/DecimalVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/DecimalVector.java @@ -18,6 +18,7 @@ package org.apache.arrow.vector; +import com.google.common.base.Preconditions; import io.netty.buffer.ArrowBuf; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.vector.complex.impl.DecimalReaderImpl; @@ -211,7 +212,6 @@ public void set(int index, ArrowBuf buffer) { * @param value array of bytes containing decimal in big endian byte order. */ public void setBigEndian(int index, byte[] value) { - assert value.length <= TYPE_WIDTH; BitVectorHelper.setValidityBitToOne(validityBuffer, index); final int length = value.length; int startIndex = index * TYPE_WIDTH; @@ -223,13 +223,32 @@ public void setBigEndian(int index, byte[] value) { valueBuffer.setByte(startIndex + 3, value[i-3]); startIndex += 4; } - } else { + + return; + } + + if (length == 0) { + valueBuffer.setZero(startIndex, TYPE_WIDTH); + return; + } + + if (length < 16) { for (int i = length - 1; i >= 0; i--) { valueBuffer.setByte(startIndex, value[i]); startIndex++; } - valueBuffer.setZero(startIndex, TYPE_WIDTH - length); + + final byte pad = (byte) (value[0] < 0 ? 0xFF : 0x00); + final int maxStartIndex = (index + 1) * TYPE_WIDTH; + while (startIndex < maxStartIndex) { + valueBuffer.setByte(startIndex, pad); + startIndex++; + } + + return; } + + throw new IllegalArgumentException("Invalid decimal value length. Valid length in [1 - 16], got " + length); } /** @@ -467,4 +486,4 @@ public void copyValueSafe(int fromIndex, int toIndex) { to.copyFromSafe(fromIndex, toIndex, DecimalVector.this); } } -} \ No newline at end of file +} diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java index 2dd2894ffebf9..bd4f7aac7b117 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java @@ -117,8 +117,7 @@ protected void reallocOffsetBuffer() { final ArrowBuf newBuf = allocator.buffer((int) newAllocationSize); newBuf.setBytes(0, offsetBuffer, 0, currentBufferCapacity); - final int halfNewCapacity = newBuf.capacity() / 2; - newBuf.setZero(halfNewCapacity, halfNewCapacity); + newBuf.setZero(currentBufferCapacity, newBuf.capacity() - currentBufferCapacity); offsetBuffer.release(1); offsetBuffer = newBuf; offsetAllocationSizeInBytes = (int) newAllocationSize; diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/FixedSizeListVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/FixedSizeListVector.java index eadbab43619a6..f863bb6791856 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/FixedSizeListVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/FixedSizeListVector.java @@ -217,8 +217,8 @@ private void reallocValidityBuffer() { } final ArrowBuf newBuf = allocator.buffer((int) newAllocationSize); - newBuf.setZero(0, newBuf.capacity()); newBuf.setBytes(0, validityBuffer, 0, currentBufferCapacity); + newBuf.setZero(currentBufferCapacity, newBuf.capacity() - currentBufferCapacity); validityBuffer.release(1); validityBuffer = newBuf; validityAllocationSizeInBytes = (int) newAllocationSize; diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java index d3eeaf2f0df7a..9666d800e4fe2 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java @@ -31,12 +31,7 @@ import org.apache.arrow.memory.BaseAllocator; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.OutOfMemoryException; -import org.apache.arrow.vector.AddOrGetResult; -import org.apache.arrow.vector.BufferBacked; -import org.apache.arrow.vector.FieldVector; -import org.apache.arrow.vector.ValueVector; -import org.apache.arrow.vector.ZeroVector; -import org.apache.arrow.vector.BitVectorHelper; +import org.apache.arrow.vector.*; import org.apache.arrow.vector.complex.impl.ComplexCopier; import org.apache.arrow.vector.complex.impl.UnionListReader; import org.apache.arrow.vector.complex.impl.UnionListWriter; @@ -301,8 +296,7 @@ private void reallocValidityBuffer() { final ArrowBuf newBuf = allocator.buffer((int) newAllocationSize); newBuf.setBytes(0, validityBuffer, 0, currentBufferCapacity); - final int halfNewCapacity = newBuf.capacity() / 2; - newBuf.setZero(halfNewCapacity, halfNewCapacity); + newBuf.setZero(currentBufferCapacity, newBuf.capacity() - currentBufferCapacity); validityBuffer.release(1); validityBuffer = newBuf; validityAllocationSizeInBytes = (int) newAllocationSize; diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/StructVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/StructVector.java index 05571bbb931c6..a90902fc86187 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/StructVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/StructVector.java @@ -130,7 +130,7 @@ public TransferPair getTransferPair(BufferAllocator allocator) { @Override public TransferPair makeTransferPair(ValueVector to) { - return new NullableStructTransferPair(this, (StructVector) to, true); + return new NullableStructTransferPair(this, (StructVector) to, false); } @Override @@ -414,8 +414,8 @@ private void reallocValidityBuffer() { } final ArrowBuf newBuf = allocator.buffer((int) newAllocationSize); - newBuf.setZero(0, newBuf.capacity()); newBuf.setBytes(0, validityBuffer, 0, currentBufferCapacity); + newBuf.setZero(currentBufferCapacity, newBuf.capacity() - currentBufferCapacity); validityBuffer.release(1); validityBuffer = newBuf; validityAllocationSizeInBytes = (int) newAllocationSize; diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestDecimalVector.java b/java/vector/src/test/java/org/apache/arrow/vector/TestDecimalVector.java index 8c86452fcc3bf..15c56ae2bc382 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/TestDecimalVector.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/TestDecimalVector.java @@ -20,6 +20,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.math.BigDecimal; import java.math.BigInteger; @@ -190,4 +191,57 @@ public void testBigDecimalReadWrite() { assertEquals(decimal8, decimalVector.getObject(7)); } } + + /** + * Test {@link DecimalVector#setBigEndian(int, byte[])} which takes BE layout input and stores in LE layout. + * Cases to cover: value given in byte array in different lengths in range [1-16] and negative values. + */ + @Test + public void decimalBE2LE() { + try (DecimalVector decimalVector = TestUtils.newVector(DecimalVector.class, "decimal", new ArrowType.Decimal(21, 2), allocator);) { + decimalVector.allocateNew(); + + BigInteger[] testBigInts = new BigInteger[] { + new BigInteger("0"), + new BigInteger("-1"), + new BigInteger("23"), + new BigInteger("234234"), + new BigInteger("-234234234"), + new BigInteger("234234234234"), + new BigInteger("-56345345345345"), + new BigInteger("29823462983462893462934679234653456345"), // converts to 16 byte array + new BigInteger("-3894572983475982374598324598234346536"), // converts to 16 byte array + new BigInteger("-345345"), + new BigInteger("754533") + }; + + int insertionIdx = 0; + insertionIdx++; // insert a null + for (BigInteger val : testBigInts) { + decimalVector.setBigEndian(insertionIdx++, val.toByteArray()); + } + insertionIdx++; // insert a null + // insert a zero length buffer + decimalVector.setBigEndian(insertionIdx++, new byte[0]); + + // Try inserting a buffer larger than 16bytes and expect a failure + try { + decimalVector.setBigEndian(insertionIdx, new byte[17]); + fail("above statement should have failed"); + } catch (IllegalArgumentException ex) { + assertTrue(ex.getMessage().equals("Invalid decimal value length. Valid length in [1 - 16], got 17")); + } + decimalVector.setValueCount(insertionIdx); + + // retrieve values and check if they are correct + int outputIdx = 0; + assertTrue(decimalVector.isNull(outputIdx++)); + for (BigInteger expected : testBigInts) { + final BigDecimal actual = decimalVector.getObject(outputIdx++); + assertEquals(expected, actual.unscaledValue()); + } + assertTrue(decimalVector.isNull(outputIdx++)); + assertEquals(BigInteger.valueOf(0), decimalVector.getObject(outputIdx).unscaledValue()); + } + } } diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestListVector.java b/java/vector/src/test/java/org/apache/arrow/vector/TestListVector.java index d12586ecc59d1..5150273aab97d 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/TestListVector.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/TestListVector.java @@ -828,4 +828,49 @@ public void testSetInitialCapacity() { assertEquals(1, vector.getDataVector().getValueCapacity()); } } + + @Test + public void testClearAndReuse() { + try (final ListVector vector = ListVector.empty("list", allocator)) { + BigIntVector bigIntVector = (BigIntVector) vector.addOrGetVector(FieldType.nullable(MinorType.BIGINT.getType())).getVector(); + vector.setInitialCapacity(10); + vector.allocateNew(); + + vector.startNewValue(0); + bigIntVector.setSafe(0, 7); + vector.endValue(0, 1); + vector.startNewValue(1); + bigIntVector.setSafe(1, 8); + vector.endValue(1, 1); + vector.setValueCount(2); + + Object result = vector.getObject(0); + ArrayList resultSet = (ArrayList) result; + assertEquals(new Long(7), resultSet.get(0)); + + result = vector.getObject(1); + resultSet = (ArrayList) result; + assertEquals(new Long(8), resultSet.get(0)); + + // Clear and release the buffers to trigger a realloc when adding next value + vector.clear(); + + // The list vector should reuse a buffer when reallocating the offset buffer + vector.startNewValue(0); + bigIntVector.setSafe(0, 7); + vector.endValue(0, 1); + vector.startNewValue(1); + bigIntVector.setSafe(1, 8); + vector.endValue(1, 1); + vector.setValueCount(2); + + result = vector.getObject(0); + resultSet = (ArrayList) result; + assertEquals(new Long(7), resultSet.get(0)); + + result = vector.getObject(1); + resultSet = (ArrayList) result; + assertEquals(new Long(8), resultSet.get(0)); + } + } } diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestOutOfMemoryForValueVector.java b/java/vector/src/test/java/org/apache/arrow/vector/TestOutOfMemoryForValueVector.java new file mode 100644 index 0000000000000..84c0fc33d9d4d --- /dev/null +++ b/java/vector/src/test/java/org/apache/arrow/vector/TestOutOfMemoryForValueVector.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.vector; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.OutOfMemoryException; +import org.apache.arrow.memory.RootAllocator; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * This class tests cases where we expect to receive {@link OutOfMemoryException}. + */ +public class TestOutOfMemoryForValueVector { + + private final static String EMPTY_SCHEMA_PATH = ""; + + private BufferAllocator allocator; + + @Before + public void init() { + allocator = new RootAllocator(200); // Start with low memory limit + } + + @Test(expected = OutOfMemoryException.class) + public void variableWidthVectorAllocateNew() { + try (VarCharVector vector = new VarCharVector(EMPTY_SCHEMA_PATH, allocator)) { + vector.allocateNew(); + } + } + + @Test(expected = OutOfMemoryException.class) + public void variableWidthVectorAllocateNewCustom() { + try (VarCharVector vector = new VarCharVector(EMPTY_SCHEMA_PATH, allocator)) { + vector.allocateNew(2342, 234); + } + } + + @Test(expected = OutOfMemoryException.class) + public void fixedWidthVectorAllocateNew() { + try (IntVector vector = new IntVector(EMPTY_SCHEMA_PATH, allocator)) { + vector.allocateNew(); + } + } + + @Test(expected = OutOfMemoryException.class) + public void fixedWidthVectorAllocateNewCustom() { + try (IntVector vector = new IntVector(EMPTY_SCHEMA_PATH, allocator)) { + vector.allocateNew(2342); + } + } + + @After + public void terminate() { + allocator.close(); + } +} diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestValueVector.java b/java/vector/src/test/java/org/apache/arrow/vector/TestValueVector.java index 5104962e9a3ed..cac5c59560f4c 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/TestValueVector.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/TestValueVector.java @@ -1912,7 +1912,6 @@ public static void setBytes(int index, byte[] bytes, VarCharVector vector) { @Test /* VarCharVector */ public void testSetInitialCapacity() { try (final VarCharVector vector = new VarCharVector(EMPTY_SCHEMA_PATH, allocator)) { - /* use the default 8 data bytes on average per element */ vector.setInitialCapacity(4096); vector.allocateNew();