Skip to content

Commit

Permalink
[SP-1697] - Regression PDI-13435 - Mapping (sub-transformation) step …
Browse files Browse the repository at this point in the history
…returns an error (5.3 Suite)
  • Loading branch information
Andrey Khayrutdinov committed Mar 3, 2015
1 parent f486c85 commit 60771de
Show file tree
Hide file tree
Showing 6 changed files with 742 additions and 31 deletions.
54 changes: 24 additions & 30 deletions engine/src/org/pentaho/di/trans/step/BaseStep.java
Original file line number Diff line number Diff line change
Expand Up @@ -1273,11 +1273,7 @@ public void putRow( RowMetaInterface rowMeta, Object[] row ) throws KettleStepEx
private void mirrorPartitioning( RowMetaInterface rowMeta, Object[] row ) {
for ( int r = 0; r < outputRowSets.size(); r++ ) {
RowSet rowSet = outputRowSets.get( r );
while ( !rowSet.putRow( rowMeta, row ) ) {
if ( isStopped() ) {
break;
}
}
putRowToRowSet( rowSet, rowMeta, row );
}
}

Expand Down Expand Up @@ -1378,11 +1374,7 @@ private void specialPartitioning( RowMetaInterface rowMeta, Object[] row ) throw
logBasic( BaseMessages.getString( PKG, "BaseStep.TargetRowsetIsNotAvailable", partitionNr ) );
} else {
// Wait
while ( !selectedRowSet.putRow( rowMeta, row ) ) {
if ( isStopped() ) {
break;
}
}
putRowToRowSet( selectedRowSet, rowMeta, row );
incrementLinesWritten();

if ( log.isRowLevel() ) {
Expand Down Expand Up @@ -1410,11 +1402,7 @@ private void specialPartitioning( RowMetaInterface rowMeta, Object[] row ) throw
} else {

// Wait
while ( !selectedRowSet.putRow( rowMeta, row ) ) {
if ( isStopped() ) {
break;
}
}
putRowToRowSet( selectedRowSet, rowMeta, row );
incrementLinesWritten();

if ( log.isRowLevel() ) {
Expand Down Expand Up @@ -1458,11 +1446,7 @@ private void noPartitioning( RowMetaInterface rowMeta, Object[] row ) throws Ket

// Loop until we find room in the target rowset
//
while ( !rs.putRow( rowMeta, row ) ) {
if ( isStopped() ) {
break;
}
}
putRowToRowSet( rs, rowMeta, row );
incrementLinesWritten();

// Now determine the next output rowset!
Expand Down Expand Up @@ -1499,11 +1483,7 @@ private void noPartitioning( RowMetaInterface rowMeta, Object[] row ) throws Ket
try {
// Loop until we find room in the target rowset
//
while ( !rs.putRow( rowMeta, rowMeta.cloneRow( row ) ) ) {
if ( isStopped() ) {
break;
}
}
putRowToRowSet( rs, rowMeta, rowMeta.cloneRow( row ) );
incrementLinesWritten();
} catch ( KettleValueException e ) {
throw new KettleStepException( "Unable to clone row while copying rows to multiple target steps", e );
Expand All @@ -1513,15 +1493,29 @@ private void noPartitioning( RowMetaInterface rowMeta, Object[] row ) throws Ket
// set row in first output rowset
//
RowSet rs = outputRowSets.get( 0 );
while ( !rs.putRow( rowMeta, row ) ) {
if ( isStopped() ) {
break;
}
}
putRowToRowSet( rs, rowMeta, row );
incrementLinesWritten();
}
}

private void putRowToRowSet( RowSet rs, RowMetaInterface rowMeta, Object[] row ) {
RowMetaInterface toBeSent;
RowMetaInterface metaFromRs = rs.getRowMeta();
if ( metaFromRs == null ) {
// RowSet is not initialised so far
toBeSent = rowMeta.clone();
} else {
// use the existing
toBeSent = metaFromRs;
}

while ( !rs.putRow( toBeSent, row ) ) {
if ( isStopped() ) {
return;
}
}
}

/**
* putRowTo is used to put a row in a certain specific RowSet.
*
Expand Down
41 changes: 40 additions & 1 deletion engine/test-src/org/pentaho/di/trans/step/BaseStepTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@
import org.mockito.stubbing.Answer;
import org.pentaho.di.core.BlockingRowSet;
import org.pentaho.di.core.RowSet;
import org.pentaho.di.core.SingleRowRowSet;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.logging.LogChannelInterface;
import org.pentaho.di.core.logging.LoggingObjectInterface;
import org.pentaho.di.core.row.RowMeta;
import org.pentaho.di.core.row.RowMetaInterface;
import org.pentaho.di.core.row.ValueMetaInterface;
import org.pentaho.di.core.row.value.ValueMetaBase;
import org.pentaho.di.core.row.value.ValueMetaInteger;
import org.pentaho.di.core.row.value.ValueMetaString;
import org.pentaho.di.trans.BasePartitioner;
import org.pentaho.di.trans.steps.mock.StepMockHelper;

Expand All @@ -43,7 +46,7 @@ public void tearDown() {

/**
* This test checks that data from one non-partitioned step copies to 2 partitioned steps right.
*
*
* @see {@link <a href="http://jira.pentaho.com/browse/PDI-12211">http://jira.pentaho.com/browse/PDI-12211<a>}
* @throws KettleException
*/
Expand Down Expand Up @@ -172,4 +175,40 @@ public void run() {
addListeners.join();
}
}

@Test
public void outputRowMetasAreNotSharedAmongSeveralStreams() throws Exception {
RowSet rs1 = new SingleRowRowSet();
RowSet rs2 = new SingleRowRowSet();

when( mockHelper.logChannelInterfaceFactory.create( any(), any( LoggingObjectInterface.class ) ) ).thenReturn(
mockHelper.logChannelInterface );
when( mockHelper.trans.isRunning() ).thenReturn( true );
BaseStep baseStep =
new BaseStep( mockHelper.stepMeta, mockHelper.stepDataInterface, 0, mockHelper.transMeta, mockHelper.trans );
baseStep.setStopped( false );
baseStep.setRepartitioning( StepPartitioningMeta.PARTITIONING_METHOD_NONE );
baseStep.setOutputRowSets( Arrays.asList( rs1, rs2 ) );

for ( RowSet rowSet : baseStep.getOutputRowSets() ) {
assertNull( "RowMeta should be null, since no calls were done", rowSet.getRowMeta() );
}

RowMetaInterface rowMeta = new RowMeta();
rowMeta.addValueMeta( new ValueMetaString( "string" ) );
rowMeta.addValueMeta( new ValueMetaInteger( "integer" ) );

baseStep.putRow( rowMeta, new Object[] { "a", 1 } );

RowMetaInterface meta1 = rs1.getRowMeta();
RowMetaInterface meta2 = rs2.getRowMeta();
assertNotNull( meta1 );
assertNotNull( meta2 );
// content is same
for ( ValueMetaInterface meta : meta1.getValueMetaList() ) {
assertTrue( meta.getName(), meta2.exists( meta ) );
}
// whereas instances differ
assertFalse( meta1 == meta2 );
}
}
14 changes: 14 additions & 0 deletions test/org/pentaho/di/trans/steps/mapping/MappingTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -236,4 +236,18 @@ public void testInfoStreams_with_main_data_path() throws Exception {
assertEquals( "Expected a single Info Stream", 1, ioMeta.getInfoStreams().size() );
assertEquals( "Expected a single Info Step", 1, loadedMappingMeta.getInfoSteps().length );
}

public void testMapping_WhenSharingPreviousStepWithAnother() throws Exception {
KettleEnvironment.init();

TransMeta transMeta = new TransMeta( "testfiles/org/pentaho/di/trans/steps/mapping/pdi-13435/PDI-13435-main.ktr" );
transMeta.setTransformationType( TransMeta.TransformationType.Normal );

Trans trans = new Trans( transMeta );
trans.prepareExecution( null );
trans.startThreads();
trans.waitUntilFinished();

assertEquals( 0, trans.getErrors() );
}
}
Loading

0 comments on commit 60771de

Please sign in to comment.