Unverified Commit b01b41c3 authored by Martin Furmanski's avatar Martin Furmanski Committed by GitHub
Browse files

Merge pull request #10378 from martinfurmanski/3.2-new-inflight-cache

New cache for in-flight Raft entries
parents 27cd5a3b b60c9f3b
Showing with 929 additions and 51 deletions
+929 -51
......@@ -28,6 +28,7 @@ import java.util.function.Consumer;
import java.util.function.Function;
import java.util.logging.Level;
import org.neo4j.causalclustering.core.consensus.log.cache.InFlightCacheFactory;
import org.neo4j.configuration.Description;
import org.neo4j.configuration.Internal;
import org.neo4j.configuration.LoadableConfig;
......@@ -123,6 +124,19 @@ public class CausalClusteringSettings implements LoadableConfig
setting( "causal_clustering.initial_discovery_members", list( ",", ADVERTISED_SOCKET_ADDRESS ),
NO_DEFAULT );
@Description( "Type of in-flight cache." )
public static final Setting<InFlightCacheFactory.Type> in_flight_cache_type =
setting( "causal_clustering.in_flight_cache.type", options( InFlightCacheFactory.Type.class, true ),
InFlightCacheFactory.Type.CONSECUTIVE.name() );
@Description( "The maximum number of entries in the in-flight cache." )
public static final Setting<Integer> in_flight_cache_max_entries =
setting( "causal_clustering.in_flight_cache.max_entries", INTEGER, "1024" );
@Description( "The maximum number of bytes in the in-flight cache." )
public static final Setting<Long> in_flight_cache_max_bytes =
setting( "causal_clustering.in_flight_cache.max_bytes", BYTES, "2G" );
public enum DiscoveryType
{
DNS,
......
......@@ -23,13 +23,13 @@ import java.io.File;
import org.neo4j.causalclustering.core.CausalClusteringSettings;
import org.neo4j.causalclustering.core.EnterpriseCoreEditionModule;
import org.neo4j.causalclustering.core.consensus.log.cache.InFlightCache;
import org.neo4j.causalclustering.core.consensus.log.InMemoryRaftLog;
import org.neo4j.causalclustering.core.consensus.log.MonitoredRaftLog;
import org.neo4j.causalclustering.core.consensus.log.RaftLog;
import org.neo4j.causalclustering.core.consensus.log.RaftLogEntry;
import org.neo4j.causalclustering.core.consensus.log.cache.InFlightCacheFactory;
import org.neo4j.causalclustering.core.consensus.log.segmented.CoreLogPruningStrategy;
import org.neo4j.causalclustering.core.consensus.log.segmented.CoreLogPruningStrategyFactory;
import org.neo4j.causalclustering.core.consensus.log.segmented.InFlightMap;
import org.neo4j.causalclustering.core.consensus.log.segmented.SegmentedRaftLog;
import org.neo4j.causalclustering.core.consensus.membership.MemberIdSetBuilder;
import org.neo4j.causalclustering.core.consensus.membership.RaftMembershipManager;
......@@ -73,7 +73,7 @@ public class ConsensusModule
private final RaftMachine raftMachine;
private final DelayedRenewableTimeoutService raftTimeoutService;
private final RaftMembershipManager raftMembershipManager;
private final InFlightMap<RaftLogEntry> inFlightMap = new InFlightMap<>();
private final InFlightCache inFlightCache;
public ConsensusModule( MemberId myself, final PlatformModule platformModule,
Outbound<MemberId,RaftMessages.RaftMessage> outbound, File clusterStateDirectory,
......@@ -127,17 +127,18 @@ public class ConsensusModule
life.add( raftMembershipManager );
inFlightCache = InFlightCacheFactory.create( config, platformModule.monitors );
RaftLogShippingManager logShipping =
new RaftLogShippingManager( outbound, logProvider, raftLog, systemClock(), myself,
raftMembershipManager, electionTimeout, config.get( catchup_batch_size ),
config.get( log_shipping_max_lag ), inFlightMap );
config.get( log_shipping_max_lag ), inFlightCache );
raftTimeoutService = new DelayedRenewableTimeoutService( systemClock(), logProvider );
raftMachine = new RaftMachine( myself, termState, voteState, raftLog, electionTimeout, heartbeatInterval,
raftTimeoutService, outbound, logProvider, raftMembershipManager, logShipping, inFlightMap,
RefuseToBeLeaderStrategy.shouldRefuseToBeLeader( config, logProvider.getLog( getClass() ) ),
platformModule.monitors, systemClock() );
raftTimeoutService, outbound, logProvider, raftMembershipManager, logShipping, inFlightCache,
RefuseToBeLeaderStrategy.shouldRefuseToBeLeader( config, logProvider.getLog( getClass() ) ), platformModule.monitors, systemClock() );
life.add( new RaftCoreTopologyConnector( coreTopologyService, raftMachine ) );
......@@ -195,8 +196,8 @@ public class ConsensusModule
return raftMembershipManager;
}
public InFlightMap<RaftLogEntry> inFlightMap()
public InFlightCache inFlightCache()
{
return inFlightMap;
return inFlightCache;
}
}
......@@ -28,9 +28,8 @@ import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import org.neo4j.causalclustering.core.consensus.log.cache.InFlightCache;
import org.neo4j.causalclustering.core.consensus.log.RaftLog;
import org.neo4j.causalclustering.core.consensus.log.RaftLogEntry;
import org.neo4j.causalclustering.core.consensus.log.segmented.InFlightMap;
import org.neo4j.causalclustering.core.consensus.membership.RaftMembershipManager;
import org.neo4j.causalclustering.core.consensus.outcome.ConsensusOutcome;
import org.neo4j.causalclustering.core.consensus.outcome.Outcome;
......@@ -64,8 +63,8 @@ import static org.neo4j.causalclustering.core.consensus.roles.Role.LEADER;
public class RaftMachine implements LeaderLocator, CoreMetaData
{
private final LeaderNotFoundMonitor leaderNotFoundMonitor;
private final InFlightMap<RaftLogEntry> inFlightMap;
private RenewableTimeoutService.RenewableTimeout heartbeatTimer;
private InFlightCache inFlightCache;
public enum Timeouts implements RenewableTimeoutService.TimeoutName
{
......@@ -98,7 +97,7 @@ public class RaftMachine implements LeaderLocator, CoreMetaData
RaftLog entryLog, long electionTimeout, long heartbeatInterval,
RenewableTimeoutService renewableTimeoutService, Outbound<MemberId,RaftMessages.RaftMessage> outbound,
LogProvider logProvider, RaftMembershipManager membershipManager, RaftLogShippingManager logShipping,
InFlightMap<RaftLogEntry> inFlightMap, boolean refuseToBecomeLeader, Monitors monitors, Clock clock )
InFlightCache inFlightCache, boolean refuseToBecomeLeader, Monitors monitors, Clock clock )
{
this.myself = myself;
this.electionTimeout = electionTimeout;
......@@ -114,8 +113,8 @@ public class RaftMachine implements LeaderLocator, CoreMetaData
this.refuseToBecomeLeader = refuseToBecomeLeader;
this.clock = clock;
this.inFlightMap = inFlightMap;
this.state = new RaftState( myself, termStorage, membershipManager, entryLog, voteStorage, inFlightMap,
this.inFlightCache = inFlightCache;
this.state = new RaftState( myself, termStorage, membershipManager, entryLog, voteStorage, inFlightCache,
logProvider );
leaderNotFoundMonitor = monitors.newMonitor( LeaderNotFoundMonitor.class );
......@@ -137,7 +136,7 @@ public class RaftMachine implements LeaderLocator, CoreMetaData
renewing( () -> handle( new RaftMessages.Timeout.Heartbeat( myself ) ) ) );
}
inFlightMap.enable();
inFlightCache.enable();
}
public synchronized void stopTimers()
......
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.core.consensus.log.cache;
import static java.lang.Math.floorMod;
import static java.util.Arrays.fill;
/**
* <pre>
* Design
*
* S: start index
* E: end index
*
* When S == E the buffer is empty.
*
* Examples:
*
* S
* v
* Empty [ | | | | | | ]
* ^
* E
*
*
* S
* v
* Size 2 [ | | | | | | ]
* ^
* E
*
*
* S
* v
* Full [ | | | | | | ]
* ^
* E
*
* New items are put at the current E, and then E is moved one step forward (circularly).
* The item at E is never a valid item.
*
* If moving E one step forward moves it onto S
* - then it knocks an element out
* - and S is also moved one step forward
*
* The S element has index 0.
* Removing an element moves S forward (circularly).
*
* @param <V> type of elements.
*/
public class CircularBuffer<V>
{
private final int arraySize; // externally visible capacity is arraySize - 1
private Object[] elementArr;
private int S;
private int E;
CircularBuffer( int capacity )
{
if ( capacity <= 0 )
{
throw new IllegalArgumentException( "Capacity must be > 0." );
}
this.arraySize = capacity + 1; // 1 item as sentinel (can't hold entries)
this.elementArr = new Object[arraySize];
}
/**
* Clears the underlying buffer and fills the provided eviction array with all evicted elements.
* The provided array must have the same capacity as the circular buffer.
*
* @param evictions Caller-provided array for evictions.
*/
public void clear( V[] evictions )
{
if ( evictions.length != arraySize - 1 )
{
throw new IllegalArgumentException( "The eviction array must be of the same size as the capacity of the circular buffer." );
}
int i = 0;
while ( S != E )
{
//noinspection unchecked
evictions[i++] = (V) elementArr[S];
S = pos( S, 1 );
}
S = 0;
E = 0;
fill( elementArr, null );
}
private int pos( int base, int delta )
{
return floorMod( base + delta, arraySize );
}
/**
* Append to the end of the buffer, possibly overwriting the
* oldest entry.
*
* @return any knocked out item, or null if nothing was knocked out.
*/
public V append( V e )
{
elementArr[E] = e;
E = pos( E, 1 );
if ( E == S )
{
//noinspection unchecked
V old = (V) elementArr[E];
elementArr[E] = null;
S = pos( S, 1 );
return old;
}
else
{
return null;
}
}
public V read( int idx )
{
//noinspection unchecked
return (V) elementArr[pos( S, idx )];
}
public V remove()
{
if ( S == E )
{
return null;
}
//noinspection unchecked
V e = (V) elementArr[S];
elementArr[S] = null;
S = pos( S, 1 );
return e;
}
public V removeHead()
{
if ( S == E )
{
return null;
}
E = pos( E, -1 );
//noinspection unchecked
V e = (V) elementArr[E];
elementArr[E] = null;
return e;
}
public int size()
{
return floorMod( E - S, arraySize );
}
}
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.core.consensus.log.cache;
import static java.lang.Math.max;
import static java.lang.Math.min;
import static java.lang.Math.toIntExact;
import static java.lang.String.format;
/**
* Keeps elements over a limited consecutive range cached.
*
* @param <V> The type of element to cache.
*/
class ConsecutiveCache<V>
{
private final CircularBuffer<V> circle;
private long endIndex = -1;
ConsecutiveCache( int capacity )
{
this.circle = new CircularBuffer<>( capacity );
}
private long firstIndex()
{
return endIndex - circle.size() + 1;
}
void put( long idx, V e, V[] evictions )
{
if ( idx < 0 )
{
throw new IllegalArgumentException( format( "Index must be >= 0 (was %d)", idx ) );
}
if ( e == null )
{
throw new IllegalArgumentException( "Null entries are not accepted" );
}
if ( idx == endIndex + 1 )
{
evictions[0] = circle.append( e );
endIndex = endIndex + 1;
}
else
{
circle.clear( evictions );
circle.append( e );
endIndex = idx;
}
}
V get( long idx )
{
if ( idx < 0 )
{
throw new IllegalArgumentException( format( "Index must be >= 0 (was %d)", idx ) );
}
if ( idx > endIndex || idx < firstIndex() )
{
return null;
}
return circle.read( toIntExact( idx - firstIndex() ) );
}
public void clear( V[] evictions )
{
circle.clear( evictions );
}
public int size()
{
return circle.size();
}
public void prune( long upToIndex, V[] evictions )
{
long index = firstIndex();
int i = 0;
while ( index <= min( upToIndex, endIndex ) )
{
evictions[i] = circle.remove();
assert evictions[i] != null;
i++;
index++;
}
}
public V remove()
{
return circle.remove();
}
public void truncate( long fromIndex, V[] evictions )
{
if ( fromIndex > endIndex )
{
return;
}
long index = max( fromIndex, firstIndex() );
int i = 0;
while ( index <= endIndex )
{
evictions[i++] = circle.removeHead();
index++;
}
endIndex = fromIndex - 1;
}
}
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.core.consensus.log.cache;
import org.neo4j.causalclustering.core.consensus.log.RaftLogEntry;
import org.neo4j.causalclustering.core.state.machines.tx.CoreReplicatedContent;
/**
* A cache that keeps Raft log entries in memory, generally to bridge the gap
* between the time that a Raft log is being appended to the local Raft log and
* at a later time applied to the store. This cache optimises for the up-to-date
* case and does not cater specifically for lagging followers. It is better to let
* those catch up from entries read from disk where possible.
* <p>
* The cache relies on highly efficient underlying data structures (a circular
* buffer) and also allows on to specify a maximum bound on the number of entries
* as well as their total size where known, see {@link CoreReplicatedContent#hasSize()}.
*/
public class ConsecutiveInFlightCache implements InFlightCache
{
private final ConsecutiveCache<RaftLogEntry> cache;
private final RaftLogEntry[] evictions;
private final InFlightCacheMonitor monitor;
private long totalBytes;
private long maxBytes;
private boolean enabled;
public ConsecutiveInFlightCache()
{
this( 1024, 8 * 1024 * 1024, InFlightCacheMonitor.VOID, true );
}
public ConsecutiveInFlightCache( int capacity, long maxBytes, InFlightCacheMonitor monitor, boolean enabled )
{
this.cache = new ConsecutiveCache<>( capacity );
this.evictions = new RaftLogEntry[capacity];
this.maxBytes = maxBytes;
this.monitor = monitor;
this.enabled = enabled;
monitor.setMaxBytes( maxBytes );
monitor.setMaxElements( capacity );
}
@Override
public synchronized void enable()
{
enabled = true;
}
@Override
public synchronized void put( long logIndex, RaftLogEntry entry )
{
if ( !enabled )
{
return;
}
totalBytes += sizeOf( entry );
cache.put( logIndex, entry, evictions );
processEvictions();
while ( totalBytes > maxBytes )
{
RaftLogEntry evicted = cache.remove();
totalBytes -= sizeOf( evicted );
}
}
@Override
public synchronized RaftLogEntry get( long logIndex )
{
if ( !enabled )
{
return null;
}
RaftLogEntry entry = cache.get( logIndex );
if ( entry == null )
{
monitor.miss();
}
else
{
monitor.hit();
}
return entry;
}
@Override
public synchronized void truncate( long fromIndex )
{
if ( !enabled )
{
return;
}
cache.truncate( fromIndex, evictions );
processEvictions();
}
@Override
public synchronized void prune( long upToIndex )
{
if ( !enabled )
{
return;
}
cache.prune( upToIndex, evictions );
processEvictions();
}
@Override
public synchronized long totalBytes()
{
return totalBytes;
}
@Override
public synchronized int elementCount()
{
return cache.size();
}
private long sizeOf( RaftLogEntry entry )
{
return entry.content().hasSize() ? entry.content().size() : 0;
}
private void processEvictions()
{
for ( int i = 0; i < evictions.length; i++ )
{
RaftLogEntry entry = evictions[i];
if ( entry == null )
{
break;
}
evictions[i] = null;
totalBytes -= sizeOf( entry );
}
monitor.setTotalBytes( totalBytes );
monitor.setElementCount( cache.size() );
}
}
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.core.consensus.log.cache;
import org.neo4j.causalclustering.core.consensus.log.RaftLogEntry;
/**
* A cache for in-flight entries which also tracks the size of the cache.
*/
public interface InFlightCache
{
/**
* Enables the cache.
*/
void enable();
/**
* Put item into the cache.
*
* @param logIndex the index of the log entry.
* @param entry the Raft log entry.
*/
void put( long logIndex, RaftLogEntry entry );
/**
* Get item from the cache.
*
* @param logIndex the index of the log entry.
* @return the log entry.
*/
RaftLogEntry get( long logIndex );
/**
* Disposes of a range of elements from the tail of the consecutive cache.
*
* @param fromIndex the index to start from (inclusive).
*/
void truncate( long fromIndex );
/**
* Prunes items from the cache.
*
* @param upToIndex the last index to prune (inclusive).
*/
void prune( long upToIndex );
/**
* @return the amount of data in the cache.
*/
long totalBytes();
/**
* @return the number of log entries in the cache.
*/
int elementCount();
}
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.core.consensus.log.cache;
import org.neo4j.causalclustering.core.CausalClusteringSettings;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.monitoring.Monitors;
import static org.neo4j.causalclustering.core.CausalClusteringSettings.in_flight_cache_max_bytes;
import static org.neo4j.causalclustering.core.CausalClusteringSettings.in_flight_cache_max_entries;
public class InFlightCacheFactory
{
public static InFlightCache create( Config config, Monitors monitors )
{
return config.get( CausalClusteringSettings.in_flight_cache_type ).create( config, monitors );
}
public enum Type
{
NONE
{
@Override
InFlightCache create( Config config, Monitors monitors )
{
return new VoidInFlightCache();
}
},
CONSECUTIVE
{
@Override
InFlightCache create( Config config, Monitors monitors )
{
return new ConsecutiveInFlightCache( config.get( in_flight_cache_max_entries ), config.get( in_flight_cache_max_bytes ),
monitors.newMonitor( InFlightCacheMonitor.class ), false );
}
},
UNBOUNDED
{
@Override
InFlightCache create( Config config, Monitors monitors )
{
return new UnboundedInFlightCache();
}
};
abstract InFlightCache create( Config config, Monitors monitors );
}
}
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.core.consensus.log.cache;
public interface InFlightCacheMonitor
{
InFlightCacheMonitor VOID = new InFlightCacheMonitor()
{
@Override
public void miss()
{
}
@Override
public void hit()
{
}
@Override
public void setMaxBytes( long maxBytes )
{
}
@Override
public void setTotalBytes( long totalBytes )
{
}
@Override
public void setMaxElements( int maxElements )
{
}
@Override
public void setElementCount( int elementCount )
{
}
};
void miss();
void hit();
void setMaxBytes( long maxBytes );
void setTotalBytes( long totalBytes );
void setMaxElements( int maxElements );
void setElementCount( int elementCount );
}
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.core.consensus.log.cache;
import java.util.HashMap;
import java.util.Map;
import org.neo4j.causalclustering.core.consensus.log.RaftLogEntry;
/**
* This cache is not meant for production use, but it can be useful
* in various investigative circumstances.
*/
public class UnboundedInFlightCache implements InFlightCache
{
private Map<Long,RaftLogEntry> map = new HashMap<>();
private boolean enabled;
@Override
public synchronized void enable()
{
enabled = true;
}
@Override
public synchronized void put( long logIndex, RaftLogEntry entry )
{
if ( !enabled )
{
return;
}
map.put( logIndex, entry );
}
@Override
public synchronized RaftLogEntry get( long logIndex )
{
if ( !enabled )
{
return null;
}
return map.get( logIndex );
}
@Override
public synchronized void truncate( long fromIndex )
{
if ( !enabled )
{
return;
}
map.keySet().removeIf( idx -> idx >= fromIndex );
}
@Override
public synchronized void prune( long upToIndex )
{
if ( !enabled )
{
return;
}
map.keySet().removeIf( idx -> idx <= upToIndex );
}
@Override
public synchronized long totalBytes()
{
// not updated correctly
return 0;
}
@Override
public synchronized int elementCount()
{
// not updated correctly
return 0;
}
}
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.core.consensus.log.cache;
import org.neo4j.causalclustering.core.consensus.log.RaftLogEntry;
/**
* A cache which caches nothing. This means that all lookups
* will go to the on-disk Raft log, which might be quite good
* anyway since recently written items will be in OS page cache
* memory generally. But it will incur an unmarshalling overhead.
*/
public class VoidInFlightCache implements InFlightCache
{
@Override
public void enable()
{
}
@Override
public void put( long logIndex, RaftLogEntry entry )
{
}
@Override
public RaftLogEntry get( long logIndex )
{
return null;
}
@Override
public void truncate( long fromIndex )
{
}
@Override
public void prune( long upToIndex )
{
}
@Override
public long totalBytes()
{
return 0;
}
@Override
public int elementCount()
{
return 0;
}
}
......@@ -22,9 +22,9 @@ package org.neo4j.causalclustering.core.consensus.outcome;
import java.io.IOException;
import java.util.Objects;
import org.neo4j.causalclustering.core.consensus.log.cache.InFlightCache;
import org.neo4j.causalclustering.core.consensus.log.RaftLog;
import org.neo4j.causalclustering.core.consensus.log.RaftLogEntry;
import org.neo4j.causalclustering.core.consensus.log.segmented.InFlightMap;
import org.neo4j.logging.Log;
public class AppendLogEntry implements RaftLogCommand
......@@ -49,9 +49,9 @@ public class AppendLogEntry implements RaftLogCommand
}
@Override
public void applyTo( InFlightMap<RaftLogEntry> inFlightMap, Log log ) throws IOException
public void applyTo( InFlightCache inFlightCache, Log log ) throws IOException
{
inFlightMap.put( index, entry );
inFlightCache.put( index, entry );
}
@Override
......
......@@ -23,9 +23,9 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Objects;
import org.neo4j.causalclustering.core.consensus.log.cache.InFlightCache;
import org.neo4j.causalclustering.core.consensus.log.RaftLog;
import org.neo4j.causalclustering.core.consensus.log.RaftLogEntry;
import org.neo4j.causalclustering.core.consensus.log.segmented.InFlightMap;
import org.neo4j.logging.Log;
import static java.lang.String.format;
......@@ -62,11 +62,11 @@ public class BatchAppendLogEntries implements RaftLogCommand
}
@Override
public void applyTo( InFlightMap<RaftLogEntry> inFlightMap, Log log )
public void applyTo( InFlightCache inFlightCache, Log log )
{
for ( int i = offset; i < entries.length; i++ )
{
inFlightMap.put( baseIndex + i , entries[i]);
inFlightCache.put( baseIndex + i , entries[i]);
}
}
......
......@@ -22,9 +22,8 @@ package org.neo4j.causalclustering.core.consensus.outcome;
import java.io.IOException;
import java.util.Objects;
import org.neo4j.causalclustering.core.consensus.log.cache.InFlightCache;
import org.neo4j.causalclustering.core.consensus.log.RaftLog;
import org.neo4j.causalclustering.core.consensus.log.RaftLogEntry;
import org.neo4j.causalclustering.core.consensus.log.segmented.InFlightMap;
import org.neo4j.logging.Log;
public class PruneLogCommand implements RaftLogCommand
......@@ -49,7 +48,7 @@ public class PruneLogCommand implements RaftLogCommand
}
@Override
public void applyTo( InFlightMap<RaftLogEntry> inFlightMap, Log log ) throws IOException
public void applyTo( InFlightCache inFlightCache, Log log ) throws IOException
{
// only the actual log prunes
}
......
......@@ -21,9 +21,9 @@ package org.neo4j.causalclustering.core.consensus.outcome;
import java.io.IOException;
import org.neo4j.causalclustering.core.consensus.log.cache.InFlightCache;
import org.neo4j.causalclustering.core.consensus.log.RaftLog;
import org.neo4j.causalclustering.core.consensus.log.RaftLogEntry;
import org.neo4j.causalclustering.core.consensus.log.segmented.InFlightMap;
import org.neo4j.logging.Log;
public interface RaftLogCommand
......@@ -39,5 +39,5 @@ public interface RaftLogCommand
void applyTo( RaftLog raftLog, Log log ) throws IOException;
void applyTo( InFlightMap<RaftLogEntry> inFlightMap, Log log ) throws IOException;
void applyTo( InFlightCache inFlightCache, Log log ) throws IOException;
}
......@@ -22,9 +22,8 @@ package org.neo4j.causalclustering.core.consensus.outcome;
import java.io.IOException;
import java.util.Objects;
import org.neo4j.causalclustering.core.consensus.log.cache.InFlightCache;
import org.neo4j.causalclustering.core.consensus.log.RaftLog;
import org.neo4j.causalclustering.core.consensus.log.RaftLogEntry;
import org.neo4j.causalclustering.core.consensus.log.segmented.InFlightMap;
import org.neo4j.logging.Log;
public class TruncateLogCommand implements RaftLogCommand
......@@ -49,10 +48,10 @@ public class TruncateLogCommand implements RaftLogCommand
}
@Override
public void applyTo( InFlightMap<RaftLogEntry> inFlightMap, Log log ) throws IOException
public void applyTo( InFlightCache inFlightCache, Log log ) throws IOException
{
log.debug( "Start truncating in-flight-map from index %d. Current map:%n%s", fromIndex, inFlightMap );
inFlightMap.truncate( fromIndex );
log.debug( "Start truncating in-flight-map from index %d. Current map:%n%s", fromIndex, inFlightCache );
inFlightCache.truncate( fromIndex );
}
@Override
......
......@@ -22,13 +22,13 @@ package org.neo4j.causalclustering.core.consensus.shipping;
import java.io.IOException;
import java.time.Clock;
import org.neo4j.causalclustering.core.consensus.log.cache.InFlightCache;
import org.neo4j.causalclustering.core.consensus.schedule.DelayedRenewableTimeoutService;
import org.neo4j.causalclustering.core.consensus.LeaderContext;
import org.neo4j.causalclustering.core.consensus.RaftMessages;
import org.neo4j.causalclustering.core.consensus.schedule.RenewableTimeoutService;
import org.neo4j.causalclustering.core.consensus.log.RaftLogEntry;
import org.neo4j.causalclustering.core.consensus.log.ReadableRaftLog;
import org.neo4j.causalclustering.core.consensus.log.segmented.InFlightMap;
import org.neo4j.causalclustering.messaging.Outbound;
import org.neo4j.causalclustering.core.state.InFlightLogEntryReader;
import org.neo4j.causalclustering.identity.MemberId;
......@@ -111,7 +111,7 @@ public class RaftLogShipper
private final long retryTimeMillis;
private final int catchupBatchSize;
private final int maxAllowedShippingLag;
private final InFlightMap<RaftLogEntry> inFlightMap;
private final InFlightCache inFlightCache;
private DelayedRenewableTimeoutService timeoutService;
private RenewableTimeout timeout;
......@@ -124,7 +124,7 @@ public class RaftLogShipper
RaftLogShipper( Outbound<MemberId, RaftMessages.RaftMessage> outbound, LogProvider logProvider,
ReadableRaftLog raftLog, Clock clock,
MemberId leader, MemberId follower, long leaderTerm, long leaderCommit, long retryTimeMillis,
int catchupBatchSize, int maxAllowedShippingLag, InFlightMap<RaftLogEntry> inFlightMap )
int catchupBatchSize, int maxAllowedShippingLag, InFlightCache inFlightCache )
{
this.outbound = outbound;
this.catchupBatchSize = catchupBatchSize;
......@@ -137,7 +137,7 @@ public class RaftLogShipper
this.leader = leader;
this.retryTimeMillis = retryTimeMillis;
this.lastLeaderContext = new LeaderContext( leaderTerm, leaderCommit );
this.inFlightMap = inFlightMap;
this.inFlightCache = inFlightCache;
}
public Object identity()
......@@ -480,7 +480,7 @@ public class RaftLogShipper
}
boolean entryMissing = false;
try ( InFlightLogEntryReader logEntrySupplier = new InFlightLogEntryReader( raftLog, inFlightMap, false ) )
try ( InFlightLogEntryReader logEntrySupplier = new InFlightLogEntryReader( raftLog, inFlightCache, false ) )
{
for ( int offset = 0; offset < batchSize; offset++ )
{
......
......@@ -27,9 +27,8 @@ import java.util.Map;
import org.neo4j.causalclustering.core.consensus.LeaderContext;
import org.neo4j.causalclustering.core.consensus.RaftMessages;
import org.neo4j.causalclustering.core.consensus.log.RaftLogEntry;
import org.neo4j.causalclustering.core.consensus.log.cache.InFlightCache;
import org.neo4j.causalclustering.core.consensus.log.ReadableRaftLog;
import org.neo4j.causalclustering.core.consensus.log.segmented.InFlightMap;
import org.neo4j.causalclustering.core.consensus.membership.RaftMembership;
import org.neo4j.causalclustering.core.consensus.outcome.ShipCommand;
import org.neo4j.causalclustering.identity.MemberId;
......@@ -51,7 +50,7 @@ public class RaftLogShippingManager extends LifecycleAdapter implements RaftMemb
private final long retryTimeMillis;
private final int catchupBatchSize;
private final int maxAllowedShippingLag;
private final InFlightMap<RaftLogEntry> inFlightMap;
private final InFlightCache inFlightCache;
private Map<MemberId,RaftLogShipper> logShippers = new HashMap<>();
private LeaderContext lastLeaderContext;
......@@ -63,7 +62,7 @@ public class RaftLogShippingManager extends LifecycleAdapter implements RaftMemb
ReadableRaftLog raftLog,
Clock clock, MemberId myself, RaftMembership membership, long retryTimeMillis,
int catchupBatchSize, int maxAllowedShippingLag,
InFlightMap<RaftLogEntry> inFlightMap )
InFlightCache inFlightCache )
{
this.outbound = outbound;
this.logProvider = logProvider;
......@@ -74,7 +73,7 @@ public class RaftLogShippingManager extends LifecycleAdapter implements RaftMemb
this.retryTimeMillis = retryTimeMillis;
this.catchupBatchSize = catchupBatchSize;
this.maxAllowedShippingLag = maxAllowedShippingLag;
this.inFlightMap = inFlightMap;
this.inFlightCache = inFlightCache;
membership.registerListener( this );
}
......@@ -116,20 +115,19 @@ public class RaftLogShippingManager extends LifecycleAdapter implements RaftMemb
stopped = true;
}
private RaftLogShipper ensureLogShipperRunning( MemberId member, LeaderContext leaderContext )
private void ensureLogShipperRunning( MemberId member, LeaderContext leaderContext )
{
RaftLogShipper logShipper = logShippers.get( member );
if ( logShipper == null && !member.equals( myself ) )
{
logShipper = new RaftLogShipper( outbound, logProvider, raftLog, clock, myself, member,
leaderContext.term, leaderContext.commitIndex, retryTimeMillis, catchupBatchSize,
maxAllowedShippingLag, inFlightMap );
maxAllowedShippingLag, inFlightCache );
logShippers.put( member, logShipper );
logShipper.start();
}
return logShipper;
}
public synchronized void handleCommands( Iterable<ShipCommand> shipCommands, LeaderContext leaderContext ) throws IOException
......
......@@ -23,10 +23,9 @@ import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import org.neo4j.causalclustering.core.consensus.log.cache.InFlightCache;
import org.neo4j.causalclustering.core.consensus.log.RaftLog;
import org.neo4j.causalclustering.core.consensus.log.RaftLogEntry;
import org.neo4j.causalclustering.core.consensus.log.ReadableRaftLog;
import org.neo4j.causalclustering.core.consensus.log.segmented.InFlightMap;
import org.neo4j.causalclustering.core.consensus.membership.RaftMembership;
import org.neo4j.causalclustering.core.consensus.outcome.Outcome;
import org.neo4j.causalclustering.core.consensus.outcome.RaftLogCommand;
......@@ -46,7 +45,7 @@ public class RaftState implements ReadableRaftState
private final RaftMembership membership;
private final Log log;
private final RaftLog entryLog;
private final InFlightMap<RaftLogEntry> inFlightMap;
private final InFlightCache inFlightCache;
private TermState termState;
private VoteState voteState;
......@@ -64,14 +63,14 @@ public class RaftState implements ReadableRaftState
RaftMembership membership,
RaftLog entryLog,
StateStorage<VoteState> voteStorage,
InFlightMap<RaftLogEntry> inFlightMap, LogProvider logProvider )
InFlightCache inFlightCache, LogProvider logProvider )
{
this.myself = myself;
this.termStorage = termStorage;
this.voteStorage = voteStorage;
this.membership = membership;
this.entryLog = entryLog;
this.inFlightMap = inFlightMap;
this.inFlightCache = inFlightCache;
log = logProvider.getLog( getClass() );
}
......@@ -194,7 +193,7 @@ public class RaftState implements ReadableRaftState
for ( RaftLogCommand logCommand : outcome.getLogCommands() )
{
logCommand.applyTo( entryLog, log );
logCommand.applyTo( inFlightMap, log );
logCommand.applyTo( inFlightCache, log );
}
commitIndex = outcome.getCommitIndex();
}
......
......@@ -61,6 +61,18 @@ public class DistributedOperation implements ReplicatedContent
return content;
}
@Override
public boolean hasSize()
{
return content.hasSize();
}
@Override
public long size()
{
return content.size();
}
public void serialize( WritableChannel channel ) throws IOException
{
channel.putLong( globalSession().sessionId().getMostSignificantBits() );
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment