Commit 468d83dd authored by Anton Klarén's avatar Anton Klarén
Browse files

Allow cross-version copy in neo4j-admin copy

The copy command can be used to clean up inconsistencies, compact stores, and do an upgrade at the same time. Also, we can allow a bigger range of upgrades, e.g. 3.4 directly to 4.1, skipping the intermediate steps of 3.5.latest -> 4.0.latest -> 4.1. Those steps are needed to migrate the schema, but since we don't copy the schema store at all during a copy, they are not needed. The schema will have to be re-created in the end on the copied to database.
No related merge requests found
Showing with 157 additions and 44 deletions
+157 -44
......@@ -31,16 +31,21 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.neo4j.collection.Dependencies;
import org.neo4j.configuration.Config;
import org.neo4j.configuration.GraphDatabaseSettings;
import org.neo4j.index.internal.gbptree.RecoveryCleanupWorkCollector;
import org.neo4j.internal.batchimport.AdditionalInitialIds;
import org.neo4j.internal.batchimport.BatchImporter;
import org.neo4j.internal.batchimport.BatchImporterFactory;
......@@ -58,6 +63,7 @@ import org.neo4j.internal.kernel.api.TokenRead;
import org.neo4j.internal.recordstorage.SchemaRuleAccess;
import org.neo4j.internal.schema.ConstraintDescriptor;
import org.neo4j.internal.schema.IndexDescriptor;
import org.neo4j.internal.schema.SchemaRule;
import org.neo4j.io.fs.DefaultFileSystemAbstraction;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.layout.DatabaseLayout;
......@@ -68,6 +74,10 @@ import org.neo4j.io.pagecache.impl.muninn.MuninnPageCache;
import org.neo4j.io.pagecache.tracing.PageCacheTracer;
import org.neo4j.io.pagecache.tracing.cursor.DefaultPageCursorTracerSupplier;
import org.neo4j.io.pagecache.tracing.cursor.context.EmptyVersionContextSupplier;
import org.neo4j.kernel.extension.DatabaseExtensions;
import org.neo4j.kernel.extension.ExtensionFactory;
import org.neo4j.kernel.extension.context.DatabaseExtensionContext;
import org.neo4j.kernel.impl.factory.DatabaseInfo;
import org.neo4j.kernel.impl.store.CommonAbstractStore;
import org.neo4j.kernel.impl.store.NeoStores;
import org.neo4j.kernel.impl.store.NodeStore;
......@@ -80,15 +90,25 @@ import org.neo4j.kernel.impl.store.format.RecordFormatSelector;
import org.neo4j.kernel.impl.store.format.RecordFormats;
import org.neo4j.kernel.impl.store.format.standard.Standard;
import org.neo4j.kernel.impl.store.record.AbstractBaseRecord;
import org.neo4j.kernel.impl.storemigration.RecordStorageMigrator;
import org.neo4j.kernel.impl.storemigration.SchemaStorage;
import org.neo4j.kernel.impl.storemigration.legacy.SchemaStorage35;
import org.neo4j.kernel.impl.storemigration.legacy.SchemaStore35;
import org.neo4j.kernel.impl.transaction.log.files.TransactionLogInitializer;
import org.neo4j.kernel.impl.transaction.state.DefaultIndexProviderMap;
import org.neo4j.kernel.lifecycle.LifeSupport;
import org.neo4j.logging.DuplicatingLogProvider;
import org.neo4j.logging.FormattedLogProvider;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;
import org.neo4j.logging.NullLogProvider;
import org.neo4j.logging.internal.NullLogService;
import org.neo4j.logging.internal.SimpleLogService;
import org.neo4j.memory.EmptyMemoryTracker;
import org.neo4j.monitoring.Monitors;
import org.neo4j.procedure.builtin.SchemaStatementProcedure;
import org.neo4j.scheduler.JobScheduler;
import org.neo4j.service.Services;
import org.neo4j.token.TokenHolders;
import org.neo4j.token.api.NamedToken;
import org.neo4j.token.api.NonUniqueTokenException;
......@@ -103,10 +123,14 @@ import static org.neo4j.internal.batchimport.ImportLogic.NO_MONITOR;
import static org.neo4j.internal.recordstorage.StoreTokens.allReadableTokens;
import static org.neo4j.internal.recordstorage.StoreTokens.createReadOnlyTokenHolder;
import static org.neo4j.io.mem.MemoryAllocator.createAllocator;
import static org.neo4j.kernel.extension.ExtensionFailureStrategies.ignore;
import static org.neo4j.kernel.impl.scheduler.JobSchedulerFactory.createInitialisedScheduler;
import static org.neo4j.kernel.impl.store.format.RecordFormatSelector.findLatestFormatInFamily;
import static org.neo4j.kernel.impl.store.format.RecordStorageCapability.FLEXIBLE_SCHEMA_STORE;
import static org.neo4j.kernel.impl.storemigration.IndexConfigMigrator.migrateIndexConfig;
import static org.neo4j.kernel.impl.storemigration.IndexProviderMigrator.upgradeIndexProvider;
import static org.neo4j.logging.Level.DEBUG;
import static org.neo4j.logging.Level.INFO;
import static org.neo4j.procedure.builtin.SchemaStatementProcedure.createStatement;
public class StoreCopy
{
......@@ -171,11 +195,10 @@ public class StoreCopy
recreatedTokens = Maps.mutable.empty();
tokenHolders = createTokenHolders( neoStores );
stats = new StoreCopyStats( log );
SchemaStore schemaStore = neoStores.getSchemaStore();
storeCopyFilter = convertFilter( deleteNodesWithLabels, skipLabels, skipProperties, skipRelationships, tokenHolders, stats );
RecordFormats recordFormats = setupRecordFormats( neoStores, format );
RecordFormats recordFormats = setupRecordFormats( neoStores.getRecordFormats(), format );
ExecutionMonitor executionMonitor = verbose ? new SpectrumExecutionMonitor( 2, TimeUnit.SECONDS, out,
SpectrumExecutionMonitor.DEFAULT_WIDTH ) : ExecutionMonitors.defaultVisible();
......@@ -197,7 +220,17 @@ public class StoreCopy
// Display schema information
log.info( "### Extracting schema ###" );
log.info( "Trying to extract schema..." );
Map<String,String> schemaStatements = getSchemaStatements( stats, schemaStore, tokenHolders );
Collection<String> schemaStatements;
if ( neoStores.getRecordFormats().hasCapability( FLEXIBLE_SCHEMA_STORE ) )
{
schemaStatements = getSchemaStatements40( stats, neoStores.getSchemaStore(), tokenHolders );
}
else
{
// Prior to 4.0, try to read with 3.5 parser
schemaStatements = getSchemaStatements35( log, neoStores.getRecordFormats(), fromPageCache, fs, tokenHolders );
}
int schemaCount = schemaStatements.size();
if ( schemaCount == 0 )
{
......@@ -207,7 +240,7 @@ public class StoreCopy
{
log.info( "... found %d schema definitions. The following can be used to recreate the schema:", schemaCount );
String newLine = System.lineSeparator();
log.info( newLine + newLine + String.join( ";" + newLine, schemaStatements.values() ) );
log.info( newLine + newLine + String.join( ";" + newLine, schemaStatements ) );
log.info( "You have to manually apply the above commands to the database when it is stared to recreate the indexes and constraints. " +
"The commands are saved to " + logFilePath.toAbsolutePath() + " as well for reference.");
}
......@@ -423,53 +456,128 @@ public class StoreCopy
return config.get( logs_directory ).resolve( format( "neo4j-admin-copy-%s.log", new SimpleDateFormat( "yyyy-MM-dd.HH.mm.ss" ).format( new Date() ) ) );
}
private static Map<String,String> getSchemaStatements( StoreCopyStats stats, SchemaStore schemaStore,
private static Collection<String> getSchemaStatements40( StoreCopyStats stats, SchemaStore schemaStore,
TokenHolders tokenHolders )
{
TokenRead tokenRead = new ReadOnlyTokenRead( tokenHolders );
SchemaRuleAccess schemaRuleAccess = SchemaRuleAccess.getSchemaRuleAccess( schemaStore, tokenHolders );
Map<String,IndexDescriptor> indexes = new HashMap<>();
List<ConstraintDescriptor> constraints = new ArrayList<>();
SchemaRuleAccess schemaRuleAccess = SchemaRuleAccess.getSchemaRuleAccess( schemaStore, tokenHolders );
schemaRuleAccess.indexesGetAllIgnoreMalformed().forEachRemaining( i -> indexes.put( i.getName(), i ) );
schemaRuleAccess.constraintsGetAllIgnoreMalformed().forEachRemaining(constraints::add );
return getSchemaStatements( stats, tokenHolders, indexes, constraints );
}
private Collection<String> getSchemaStatements35( Log log, RecordFormats recordFormats, PageCache fromPageCache, FileSystemAbstraction fs,
TokenHolders tokenHolders )
{
Map<String,IndexDescriptor> indexes = new HashMap<>();
List<ConstraintDescriptor> constraints = new ArrayList<>();
// Open store with old reader
LifeSupport life = new LifeSupport();
try ( SchemaStore35 schemaStore35 = new SchemaStore35( from.schemaStore(), from.idSchemaStore(), config, org.neo4j.internal.id.IdType.SCHEMA,
new ScanOnOpenReadOnlyIdGeneratorFactory(), fromPageCache, NullLogProvider.getInstance(), recordFormats ) )
{
schemaStore35.initialise( true );
SchemaStorage schemaStorage35 = new SchemaStorage35( schemaStore35 );
// Load index providers
Dependencies deps = new Dependencies();
Monitors monitors = new Monitors();
deps.satisfyDependencies( fs, config, fromPageCache, NullLogService.getInstance(), monitors, RecoveryCleanupWorkCollector.immediate() );
DatabaseExtensionContext extensionContext = new DatabaseExtensionContext( from, DatabaseInfo.UNKNOWN, deps );
Iterable<?> extensionFactories = Services.loadAll( ExtensionFactory.class );
@SuppressWarnings( "unchecked" )
DatabaseExtensions databaseExtensions =
life.add( new DatabaseExtensions( extensionContext, (Iterable<ExtensionFactory<?>>) extensionFactories, deps, ignore() ) );
DefaultIndexProviderMap indexProviderMap = life.add( new DefaultIndexProviderMap( databaseExtensions, config ) );
life.start();
// Get rules and migrate to latest
Map<Long,SchemaRule> ruleById = new LinkedHashMap<>();
schemaStorage35.getAll().forEach( rule -> ruleById.put( rule.getId(), rule ) );
RecordStorageMigrator.schemaGenerateNames( schemaStorage35, tokenHolders, ruleById );
for ( Map.Entry<Long,SchemaRule> entry : ruleById.entrySet() )
{
SchemaRule schemaRule = entry.getValue();
if ( schemaRule instanceof IndexDescriptor )
{
IndexDescriptor indexDescriptor = (IndexDescriptor) schemaRule;
try
{
indexDescriptor = (IndexDescriptor) migrateIndexConfig( indexDescriptor, from, fs, fromPageCache, indexProviderMap, log );
indexDescriptor = (IndexDescriptor) upgradeIndexProvider( indexDescriptor );
indexes.put( indexDescriptor.getName(), indexDescriptor );
}
catch ( IOException e )
{
stats.invalidIndex( indexDescriptor, e );
}
}
else if ( schemaRule instanceof ConstraintDescriptor )
{
constraints.add( (ConstraintDescriptor) schemaRule );
}
}
}
catch ( Exception e )
{
log.error( format( "Failed to read schema store %s with 3.5 parser", from.schemaStore() ), e );
return Collections.emptyList();
}
finally
{
life.shutdown();
}
// Convert to cypher statements
return getSchemaStatements( stats, tokenHolders, indexes, constraints );
}
private static Collection<String> getSchemaStatements( StoreCopyStats stats, TokenHolders tokenHolders, Map<String,IndexDescriptor> indexes,
List<ConstraintDescriptor> constraints )
{
TokenRead tokenRead = new ReadOnlyTokenRead( tokenHolders );
// Here we use a map and insert index first, if it have a backing constraint, it will be replaced when we
// insert them after since they have the same name
Map<String,String> schemaStatements = new HashMap<>();
for ( var entry : indexes.entrySet() )
for ( IndexDescriptor index : indexes.values() )
{
String statement;
try
{
statement = createStatement( tokenRead, entry.getValue() );
if ( !index.isUnique() )
{
schemaStatements.put( index.getName(), SchemaStatementProcedure.createStatement( tokenRead, index ) );
}
}
catch ( Exception e )
{
stats.invalidIndex( entry.getValue(), e );
continue;
stats.invalidIndex( index, e );
}
schemaStatements.put( entry.getKey(), statement );
}
for ( ConstraintDescriptor constraint : constraints )
{
String statement;
try
{
statement = createStatement( indexes::get, tokenRead, constraint );
schemaStatements.put( constraint.getName(), SchemaStatementProcedure.createStatement( indexes::get, tokenRead, constraint ) );
}
catch ( Exception e )
{
stats.invalidConstraint( constraint, e );
continue;
}
schemaStatements.put( constraint.getName(), statement );
}
return schemaStatements;
return schemaStatements.values();
}
private static RecordFormats setupRecordFormats( NeoStores neoStores, FormatEnum format )
private static RecordFormats setupRecordFormats( RecordFormats fromRecordFormat, FormatEnum format )
{
if ( format == FormatEnum.same )
{
return neoStores.getRecordFormats();
return findLatestFormatInFamily( fromRecordFormat )
.orElseThrow( () -> new IllegalArgumentException( "This version do not support format family " + fromRecordFormat.getFormatFamily() ) );
}
else if ( format == FormatEnum.high_limit )
{
......
......@@ -86,7 +86,7 @@ public class IndexConfigMigrator extends AbstractStoreMigrationParticipant
{
for ( SchemaRule rule : ruleAccess.getAll() )
{
SchemaRule upgraded = migrateIndexConfig( rule, directoryLayout );
SchemaRule upgraded = migrateIndexConfig( rule, directoryLayout, fs, pageCache, indexProviderMap, log );
if ( upgraded != rule )
{
......@@ -96,7 +96,8 @@ public class IndexConfigMigrator extends AbstractStoreMigrationParticipant
}
}
private SchemaRule migrateIndexConfig( SchemaRule rule, DatabaseLayout directoryLayout ) throws IOException
public static SchemaRule migrateIndexConfig( SchemaRule rule, DatabaseLayout directoryLayout, FileSystemAbstraction fs, PageCache pageCache,
IndexProviderMap indexProviderMap, Log log ) throws IOException
{
if ( rule instanceof IndexDescriptor )
{
......
......@@ -135,11 +135,11 @@ enum IndexMigration
// └── fulltext-1.0
// └── 1
// ├── fulltext-1.0
// │   ├── 1
// │   │   ├── segments_1
// │   │   └── write.lock
// │   ├── failure-message
// │   └── fulltext-index.properties <- Fulltext index settings
// │ ├── 1
// │ ├── segments_1
// │ └── write.lock
// │ ├── failure-message
// │ └── fulltext-index.properties <- Fulltext index settings
// └── fulltext-1.0.tx <- Transaction folder
File fulltext10Dir = providerRootDirectories( layout )[0];
File directoryForIndex = path( fulltext10Dir, String.valueOf( indexId ) );
......
......@@ -106,7 +106,7 @@ public class IndexProviderMigrator extends AbstractStoreMigrationParticipant
// Nothing to clean up.
}
private SchemaRule upgradeIndexProvider( SchemaRule rule )
public static SchemaRule upgradeIndexProvider( SchemaRule rule )
{
if ( rule instanceof IndexDescriptor )
{
......
......@@ -161,7 +161,7 @@ public class StoreUpgrader
}
}
private boolean hasCurrentVersion( StoreVersionCheck storeVersionCheck )
private static boolean hasCurrentVersion( StoreVersionCheck storeVersionCheck )
{
String configuredVersion = storeVersionCheck.configuredVersion();
StoreVersionCheck.Result versionResult = storeVersionCheck.checkUpgrade( configuredVersion );
......
......@@ -305,7 +305,7 @@ public class RecordFormatSelector
}
}
private static Optional<RecordFormats> findLatestFormatInFamily( RecordFormats result )
public static Optional<RecordFormats> findLatestFormatInFamily( RecordFormats result )
{
return Iterables.stream( allFormats() )
.filter( format -> isSameFamily( result, format ) )
......
......@@ -735,17 +735,27 @@ public class RecordStorageMigrator extends AbstractStoreMigrationParticipant
static void migrateSchemaRules( TokenHolders srcTokenHolders, SchemaStorage srcAccess, SchemaRuleMigrationAccess dstAccess ) throws KernelException
{
SchemaNameGiver nameGiver = new SchemaNameGiver( srcTokenHolders );
LinkedHashMap<Long,SchemaRule> rules = new LinkedHashMap<>();
schemaGenerateNames( srcAccess, srcTokenHolders, rules );
// Once all rules have been processed, write them out.
for ( SchemaRule rule : rules.values() )
{
List<SchemaRule> namedRules = new ArrayList<>();
List<SchemaRule> unnamedRules = new ArrayList<>();
srcAccess.getAll().forEach( r -> (hasName( r ) ? namedRules : unnamedRules).add( r ) );
// Make sure that we process explicitly named schemas first.
namedRules.forEach( r -> rules.put( r.getId(), r ) );
unnamedRules.forEach( r -> rules.put( r.getId(), r ) );
dstAccess.writeSchemaRule( rule );
}
}
public static void schemaGenerateNames( SchemaStorage srcAccess, TokenHolders srcTokenHolders,
Map<Long,SchemaRule> rules ) throws KernelException
{
SchemaNameGiver nameGiver = new SchemaNameGiver( srcTokenHolders );
List<SchemaRule> namedRules = new ArrayList<>();
List<SchemaRule> unnamedRules = new ArrayList<>();
srcAccess.getAll().forEach( r -> (hasName( r ) ? namedRules : unnamedRules).add( r ) );
// Make sure that we process explicitly named schemas first.
namedRules.forEach( r -> rules.put( r.getId(), r ) );
unnamedRules.forEach( r -> rules.put( r.getId(), r ) );
for ( Map.Entry<Long,SchemaRule> entry : rules.entrySet() )
{
......@@ -785,12 +795,6 @@ public class RecordStorageMigrator extends AbstractStoreMigrationParticipant
}
}
}
// Once all rules have been processed, write them out.
for ( SchemaRule rule : rules.values() )
{
dstAccess.writeSchemaRule( rule );
}
}
private static boolean hasName( SchemaRule rule )
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment