Commit 2274d03b authored by Mattias Finné's avatar Mattias Finné
Browse files

Fixes an issue where a panic during import could cause a deadlock

parent 0af77afd
Showing with 121 additions and 7 deletions
+121 -7
......@@ -199,7 +199,12 @@ public abstract class AbstractStep<T> implements Step<T>
// stillWorking(), once false cannot again return true so no need to check
if ( !isCompleted() )
{
done();
// In the event of panic do not even try to do any sort of completion step, which btw may entail sending more batches downstream
// or do heavy end-result calculations
if ( !isPanic() )
{
done();
}
if ( downstream != null )
{
downstream.endOfUpstream();
......
......@@ -72,6 +72,14 @@ public class StageExecution implements StageControl, AutoCloseable
return false;
}
public void awaitCompletion() throws InterruptedException
{
for ( Step<?> step : pipeline )
{
step.awaitCompleted();
}
}
public void start()
{
for ( Step<?> step : pipeline )
......
......@@ -28,14 +28,17 @@ import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import org.neo4j.internal.batchimport.Configuration;
import org.neo4j.internal.batchimport.stats.Keys;
import org.neo4j.test.OtherThreadExecutor.WorkerCommand;
import org.neo4j.test.rule.OtherThreadRule;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.neo4j.internal.batchimport.staging.Step.ORDER_SEND_DOWNSTREAM;
class ProcessorStepTest
{
......@@ -60,7 +63,7 @@ class ProcessorStepTest
StageControl control = mock( StageControl.class );
try ( MyProcessorStep step = new MyProcessorStep( control, 0 ) )
{
step.start( Step.ORDER_SEND_DOWNSTREAM );
step.start( ORDER_SEND_DOWNSTREAM );
step.processors( 4 ); // now at 5
// WHEN
......@@ -94,9 +97,9 @@ class ProcessorStepTest
}
};
Future<Void> receiveFuture;
try ( ProcessorStep<Void> step = new BlockingProcessorStep( control, configuration, processors, latch ) )
try ( ProcessorStep<Void> step = new BlockingProcessorStep<>( control, configuration, processors, latch ) )
{
step.start( Step.ORDER_SEND_DOWNSTREAM );
step.start( ORDER_SEND_DOWNSTREAM );
step.processors( 1 ); // now at 2
// adding up to max processors should be fine
for ( int i = 0; i < processors + maxProcessors /* +1 since we allow queueing one more*/; i++ )
......@@ -121,7 +124,7 @@ class ProcessorStepTest
StageControl control = mock( StageControl.class );
try ( MyProcessorStep step = new MyProcessorStep( control, 0 ) )
{
step.start( Step.ORDER_SEND_DOWNSTREAM );
step.start( ORDER_SEND_DOWNSTREAM );
// WHEN
int batches = 10;
......@@ -137,7 +140,105 @@ class ProcessorStepTest
}
}
private static class BlockingProcessorStep extends ProcessorStep<Void>
@Test
public void shouldBeAbleToPropagatePanicOnBlockedProcessorsWhenLast() throws InterruptedException
{
shouldBeAbleToPropagatePanicOnBlockedProcessors( 2, 1 );
}
@Test
public void shouldBeAbleToPropagatePanicOnBlockedProcessorsWhenNotLast() throws InterruptedException
{
shouldBeAbleToPropagatePanicOnBlockedProcessors( 3, 1 );
}
private void shouldBeAbleToPropagatePanicOnBlockedProcessors( int numProcessors, int failingProcessorIndex ) throws InterruptedException
{
// Given
String exceptionMessage = "Failing just for fun";
Configuration configuration = Configuration.DEFAULT;
CountDownLatch latch = new CountDownLatch( 1 );
Stage stage = new Stage( "Test", "Part", configuration, ORDER_SEND_DOWNSTREAM );
stage.add( intProducer( configuration, stage, configuration.maxNumberOfProcessors() * 2 ) );
ProcessorStep<Integer> failingProcessor = null;
for ( int i = 0; i < numProcessors; i++ )
{
if ( failingProcessorIndex == i )
{
failingProcessor = new BlockingProcessorStep<>( stage.control(), configuration, 1, latch )
{
@Override
protected void process( Integer batch, BatchSender sender ) throws Throwable
{
// Block until the latch is released below
super.process( batch, sender );
// Then immediately throw exception so that a panic will be issued
throw new RuntimeException( exceptionMessage );
}
};
stage.add( failingProcessor );
}
else
{
stage.add( intProcessor( configuration, stage ) );
}
}
try
{
// When
StageExecution execution = stage.execute();
while ( failingProcessor.stats().stat( Keys.received_batches ).asLong() < configuration.maxNumberOfProcessors() + 1 )
{
Thread.sleep( 10 );
}
latch.countDown();
// Then
execution.awaitCompletion();
RuntimeException exception = assertThrows( RuntimeException.class, execution::assertHealthy );
assertEquals( exceptionMessage, exception.getMessage() );
}
finally
{
stage.close();
}
}
private static ProducerStep intProducer( Configuration configuration, Stage stage, int batches )
{
return new ProducerStep( stage.control(), configuration )
{
@Override
protected void process()
{
for ( int i = 0; i < batches; i++ )
{
sendDownstream( i );
}
}
@Override
protected long position()
{
return 0;
}
};
}
private static ProcessorStep<Integer> intProcessor( Configuration configuration, Stage stage )
{
return new ProcessorStep<>( stage.control(), "processor", configuration, 1 )
{
@Override
protected void process( Integer batch, BatchSender sender )
{
sender.send( batch );
}
};
}
private static class BlockingProcessorStep<T> extends ProcessorStep<T>
{
private final CountDownLatch latch;
......@@ -149,7 +250,7 @@ class ProcessorStepTest
}
@Override
protected void process( Void batch, BatchSender sender ) throws Throwable
protected void process( T batch, BatchSender sender ) throws Throwable
{
latch.await();
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment