Commit b5ee6532 authored by andrey zaytsev's avatar andrey zaytsev
Browse files

read index from cassandra

parent b30af0ec
Showing with 280 additions and 11 deletions
+280 -11
...@@ -145,9 +145,8 @@ public class StubIndexImpl extends StubIndex implements ApplicationComponentAdap ...@@ -145,9 +145,8 @@ public class StubIndexImpl extends StubIndex implements ApplicationComponentAdap
); );
final MemoryIndexStorage<K, StubIdList> memStorage = new MemoryIndexStorage<>(storage, indexKey); final MemoryIndexStorage<K, StubIdList> memStorage = new MemoryIndexStorage<>(storage, indexKey);
IndexStorage<K, StubIdList> finalStorage = PersistentFSImpl.indexer ? new IndexerIndexStorage<>(memStorage, indexKey, extension.getKeyDescriptor(), IndexStorage<K, StubIdList> finalStorage = PersistentFSImpl.indexer ? new IndexerIndexStorage<>(memStorage, indexKey, extension.getKeyDescriptor(), StubIdExternalizer.INSTANCE)
StubIdExternalizer.INSTANCE) : new ClientIndexStorage<>(memStorage, indexKey, extension.getKeyDescriptor(), StubIdExternalizer.INSTANCE);
: memStorage;
MyIndex<K> index = new MyIndex<>(new IndexExtension<K, StubIdList, Void>() { MyIndex<K> index = new MyIndex<>(new IndexExtension<K, StubIdList, Void>() {
@NotNull @NotNull
@Override @Override
......
package com.intellij.util.indexing;/*
* Copyright 2000-2017 JetBrains s.r.o.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.InputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
public class ByteBufferInputStream extends InputStream {
private ByteBuffer _buf;
public ByteBufferInputStream(ByteBuffer buf) {
_buf = buf;
}
public void close() {
}
public int available() {
return _buf.remaining();
}
public boolean markSupported() {
return true;
}
public void mark(int readlimit) {
_buf.mark();
}
public void reset() {
_buf.reset();
}
public long skip(long n) {
int nP = Math.min((int)n, _buf.remaining());
_buf.position(_buf.position() + nP);
return (long)nP;
}
public int read() throws IOException {
if (!_buf.hasRemaining()) {
return -1;
} else {
return (int) _buf.get() & 0xFF;
}
}
public int read(byte[] bytes, int offset, int length) throws IOException {
length = Math.min(length, _buf.remaining());
if (length == 0) {
return -1;
} else {
_buf.get(bytes, offset, length);
return length;
}
}
}
...@@ -12,6 +12,15 @@ import com.intellij.openapi.util.Pair; ...@@ -12,6 +12,15 @@ import com.intellij.openapi.util.Pair;
public class CassandraIndexTable { public class CassandraIndexTable {
private final Session mySession; private final Session mySession;
private final PreparedStatement myQueryStatement;
public Collection<ByteBuffer> readKey(String id, int indexingSession, ByteBuffer key) {
ResultSet result = mySession.execute(myQueryStatement.bind(indexingSession, id, key));
Row row = result.one();
Map<Integer, ByteBuffer> byShard = row.getMap("result", Integer.class, ByteBuffer.class);
ByteBuffer buffer = byShard.get(1);
return byShard.values();
}
public static CassandraIndexTable getInstance() { public static CassandraIndexTable getInstance() {
return ApplicationManager.getApplication().getComponent(CassandraIndexTable.class); return ApplicationManager.getApplication().getComponent(CassandraIndexTable.class);
...@@ -23,10 +32,44 @@ public class CassandraIndexTable { ...@@ -23,10 +32,44 @@ public class CassandraIndexTable {
.withPort(9042) .withPort(9042)
.build(); .build();
mySession = cluster.connect(); mySession = cluster.connect();
mySession.execute("CREATE KEYSPACE IF NOT EXISTS indices WITH replication = {'class' : 'SimpleStrategy', 'replication_factor' : 1};");
mySession.execute("CREATE TABLE IF NOT EXISTS indices.indices(" +
"index_id text, " +
"key blob, " +
"shard_id int, " +
"indexing_session int, " +
"values blob, " +
"PRIMARY KEY ((index_id, key), indexing_session, shard_id)) " +
"WITH CLUSTERING ORDER BY (indexing_session DESC)");
mySession.execute("CREATE OR REPLACE FUNCTION indices.uniqueShards (state map<int, blob>, shard int, val blob) " +
"CALLED ON NULL INPUT RETURNS map<int, blob> LANGUAGE java as " +
"'" +
"if (state == null) { " +
"state = new java.util.HashMap<Integer, java.nio.ByteBuffer>(); " +
"} " +
"if (state.containsKey(shard)) { " +
" return state; " +
"} " +
"state.put(shard, val); " +
" return state; " +
"'");
mySession.execute("CREATE OR REPLACE AGGREGATE indices.queryIndex (int, blob) " +
"SFUNC uniqueShards " +
"STYPE map<int, blob>");
myQueryStatement = mySession.prepare(new SimpleStatement("SELECT queryIndex(shard_id, values) as result FROM indices.indices " +
"WHERE indexing_session <= ? AND " +
"index_id = ? AND " +
"key = ? " +
"ORDER BY indexing_session DESC"));
} }
public void bulkInsert(String indexId, int shardId, int indexingSession, Stream<Pair<ByteBuffer, ByteBuffer>> s) { public void bulkInsert(String indexId, int shardId, int indexingSession, Stream<Pair<ByteBuffer, ByteBuffer>> s) {
int chunkSize = 5 * 1024; int chunkSize = 50 * 1024;
List<Pair<ByteBuffer, ByteBuffer>> chunk = new ArrayList<>(); List<Pair<ByteBuffer, ByteBuffer>> chunk = new ArrayList<>();
int currentChunkBytes = 0; int currentChunkBytes = 0;
Iterator<Pair<ByteBuffer, ByteBuffer>> iterator = s.iterator(); Iterator<Pair<ByteBuffer, ByteBuffer>> iterator = s.iterator();
...@@ -35,7 +78,6 @@ public class CassandraIndexTable { ...@@ -35,7 +78,6 @@ public class CassandraIndexTable {
int size = pair.first.limit() - pair.first.position() + pair.second.limit() - pair.second.position(); int size = pair.first.limit() - pair.first.position() + pair.second.limit() - pair.second.position();
if (size > chunkSize) { if (size > chunkSize) {
insertOne(indexId, shardId, indexingSession, pair); insertOne(indexId, shardId, indexingSession, pair);
//System.out.println("Chunk is too large indexId: " + indexId + " size: " + size + " max allowed: " + chunkSize);
} else { } else {
if (currentChunkBytes + size > chunkSize) { if (currentChunkBytes + size > chunkSize) {
flushChunk(indexId, shardId, indexingSession, chunk); flushChunk(indexId, shardId, indexingSession, chunk);
...@@ -73,7 +115,7 @@ public class CassandraIndexTable { ...@@ -73,7 +115,7 @@ public class CassandraIndexTable {
private void flushChunk(String indexId, int shardId, int sessionId, List<Pair<ByteBuffer, ByteBuffer>> chunk) { private void flushChunk(String indexId, int shardId, int sessionId, List<Pair<ByteBuffer, ByteBuffer>> chunk) {
PreparedStatement insertStmt = getInsertStatement(indexId, shardId, sessionId); PreparedStatement insertStmt = getInsertStatement(indexId, shardId, sessionId);
BatchStatement batch = new BatchStatement(); BatchStatement batch = new BatchStatement(BatchStatement.Type.UNLOGGED);
for (Pair<ByteBuffer, ByteBuffer> pair : chunk) { for (Pair<ByteBuffer, ByteBuffer> pair : chunk) {
batch.add(insertStmt.bind(pair.first, pair.second)); batch.add(insertStmt.bind(pair.first, pair.second));
} }
......
package com.intellij.util.indexing;
import com.intellij.openapi.util.io.BufferExposingByteArrayOutputStream;
import com.intellij.psi.search.GlobalSearchScope;
import com.intellij.util.Processor;
import com.intellij.util.indexing.impl.UpdatableValueContainer;
import com.intellij.util.io.DataExternalizer;
import com.intellij.util.io.DataInputOutputUtil;
import com.intellij.util.io.DataOutputStream;
import com.intellij.util.io.KeyDescriptor;
import gnu.trove.THashSet;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.io.IOException;
import java.io.DataInputStream;
import java.nio.ByteBuffer;
import java.util.Collection;
public class ClientIndexStorage<K, V> implements VfsAwareIndexStorage<K, V>, BufferingIndexStorage {
private final VfsAwareIndexStorage<K, V> myDelegate;
private final ID<?, ?> myId;
private final KeyDescriptor<K> myKd;
private final DataExternalizer<V> myVd;
private final THashSet<K> myCachedKeys = new THashSet<>();
public ClientIndexStorage(VfsAwareIndexStorage<K, V> delegate, ID<?, ?> indexId, KeyDescriptor<K> kd, DataExternalizer<V> vd) {
myDelegate = delegate;
myId = indexId;
myKd = kd;
myVd = vd;
}
@Override
public void addBufferingStateListener(@NotNull BufferingStateListener listener) {
((BufferingIndexStorage)myDelegate).addBufferingStateListener(listener);
}
@Override
public void removeBufferingStateListener(@NotNull BufferingStateListener listener) {
((BufferingIndexStorage)myDelegate).removeBufferingStateListener(listener);
}
@Override
public void setBufferingEnabled(boolean enabled) {
((BufferingIndexStorage)myDelegate).setBufferingEnabled(enabled);
}
@Override
public boolean isBufferingEnabled() {
return ((BufferingIndexStorage)myDelegate).isBufferingEnabled();
}
@Override
public void clearMemoryMap() {
((BufferingIndexStorage)myDelegate).clearMemoryMap();
}
@Override
public void fireMemoryStorageCleared() {
((BufferingIndexStorage)myDelegate).fireMemoryStorageCleared();
}
@Override
public boolean processKeys(@NotNull Processor<K> processor, GlobalSearchScope scope, @Nullable IdFilter idFilter)
throws StorageException {
return myDelegate.processKeys(processor, scope, idFilter);
}
@Override
public void addValue(K k, int inputId, V v) throws StorageException {
myDelegate.addValue(k, inputId, v);
}
@Override
public void removeAllValues(@NotNull K k, int inputId) throws StorageException {
myDelegate.removeAllValues(k, inputId);
}
@Override
public void clear() throws StorageException {
}
@NotNull
@Override
public ValueContainer<V> read(K k) throws StorageException {
if (!myCachedKeys.contains(k)) {
try {
cacheKey(k);
}
catch (IOException e) {
throw new StorageException(e);
}
myCachedKeys.add(k);
}
return myDelegate.read(k);
}
private static <Value> void readInto(UpdatableValueContainer<Value> into, DataInputStream stream, DataExternalizer<Value> externalizer) throws IOException {
while (stream.available() > 0) {
final int valueCount = DataInputOutputUtil.readINT(stream);
if (valueCount < 0) {
final int inputId = -valueCount;
into.removeAssociatedValue(inputId);
}
else {
for (int valueIdx = 0; valueIdx < valueCount; valueIdx++) {
final Value value = externalizer.read(stream);
int idCountOrSingleValue = DataInputOutputUtil.readINT(stream);
if (idCountOrSingleValue > 0) {
into.addValue(idCountOrSingleValue, value);
}
else {
idCountOrSingleValue = -idCountOrSingleValue;
int prev = 0;
for (int i = 0; i < idCountOrSingleValue; i++) {
final int id = DataInputOutputUtil.readINT(stream);
into.addValue(prev + id, value);
prev += id;
}
}
}
}
}
}
private void cacheKey(K k) throws IOException, StorageException {
BufferExposingByteArrayOutputStream baos = new BufferExposingByteArrayOutputStream();
myKd.save(new DataOutputStream(baos), k);
ByteBuffer bb = ByteBuffer.wrap(baos.getInternalBuffer(), 0, baos.size());
Collection<ByteBuffer> values = CassandraIndexTable.getInstance().readKey(myId.toString(), 1, bb);
UpdatableValueContainer<V> container = (UpdatableValueContainer<V>)myDelegate.read(k);
for (ByteBuffer value : values) {
ByteBufferInputStream stream = new ByteBufferInputStream(value);
readInto(container, new DataInputStream(stream), myVd);
}
}
@Override
public void clearCaches() {
myDelegate.clearCaches();
}
@Override
public void close() throws StorageException {
myDelegate.close();
}
@Override
public void flush() throws IOException {
myDelegate.flush();
}
}
\ No newline at end of file
...@@ -399,7 +399,7 @@ public class FileBasedIndexImpl extends FileBasedIndex { ...@@ -399,7 +399,7 @@ public class FileBasedIndexImpl extends FileBasedIndex {
if (PersistentFSImpl.indexer) { if (PersistentFSImpl.indexer) {
finalStorage = new IndexerIndexStorage<>(mem, name, extension.getKeyDescriptor(), extension.getValueExternalizer()); finalStorage = new IndexerIndexStorage<>(mem, name, extension.getKeyDescriptor(), extension.getValueExternalizer());
} else { } else {
finalStorage = mem; finalStorage = new ClientIndexStorage<>(mem, name, extension.getKeyDescriptor(), extension.getValueExternalizer());
} }
state.registerIndex(name, state.registerIndex(name,
createIndex(extension, finalStorage), createIndex(extension, finalStorage),
......
...@@ -92,7 +92,7 @@ public class PersistentFSImpl extends PersistentFS implements ApplicationCompone ...@@ -92,7 +92,7 @@ public class PersistentFSImpl extends PersistentFS implements ApplicationCompone
LowMemoryWatcher.register(this::clearIdCache, this); LowMemoryWatcher.register(this::clearIdCache, this);
} }
public static boolean indexer = true; public static boolean indexer = false;
@Override @Override
public void initComponent() { public void initComponent() {
...@@ -104,7 +104,7 @@ public class PersistentFSImpl extends PersistentFS implements ApplicationCompone ...@@ -104,7 +104,7 @@ public class PersistentFSImpl extends PersistentFS implements ApplicationCompone
assert !once.get(); assert !once.get();
once.set(true); once.set(true);
if (indexer) { if (indexer || true) {
return new FSRecords(new File(FSRecords.getCachesDir())); return new FSRecords(new File(FSRecords.getCachesDir()));
} else { } else {
FSRecords sourceRecords = new FSRecords(new File(FSRecords.getCachesDir())); FSRecords sourceRecords = new FSRecords(new File(FSRecords.getCachesDir()));
......
...@@ -25,7 +25,7 @@ import java.util.List; ...@@ -25,7 +25,7 @@ import java.util.List;
/** /**
* Created by Maxim.Mossienko on 7/4/2014. * Created by Maxim.Mossienko on 7/4/2014.
*/ */
class FileId2ValueMapping<Value> { public class FileId2ValueMapping<Value> {
private TIntObjectHashMap<Value> id2ValueMap; private TIntObjectHashMap<Value> id2ValueMap;
private ValueContainerImpl<Value> valueContainer; private ValueContainerImpl<Value> valueContainer;
private boolean myOnePerFileValidationEnabled = true; private boolean myOnePerFileValidationEnabled = true;
......
...@@ -19,6 +19,7 @@ package com.intellij.util.indexing.impl; ...@@ -19,6 +19,7 @@ package com.intellij.util.indexing.impl;
import com.intellij.util.indexing.ValueContainer; import com.intellij.util.indexing.ValueContainer;
import com.intellij.util.io.DataExternalizer; import com.intellij.util.io.DataExternalizer;
import java.io.DataInput;
import java.io.DataOutput; import java.io.DataOutput;
import java.io.IOException; import java.io.IOException;
......
...@@ -41,7 +41,7 @@ import java.util.List; ...@@ -41,7 +41,7 @@ import java.util.List;
* @author Eugene Zhuravlev * @author Eugene Zhuravlev
* Date: Dec 20, 2007 * Date: Dec 20, 2007
*/ */
class ValueContainerImpl<Value> extends UpdatableValueContainer<Value> implements Cloneable{ public class ValueContainerImpl<Value> extends UpdatableValueContainer<Value> implements Cloneable{
private static final Logger LOG = Logger.getInstance("#com.intellij.util.indexing.impl.ValueContainerImpl"); private static final Logger LOG = Logger.getInstance("#com.intellij.util.indexing.impl.ValueContainerImpl");
private final static Object myNullValue = new Object(); private final static Object myNullValue = new Object();
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment