Skip to content

Commit

Permalink
Splits UpdatePuller into two roles: obligation fulfiller and update p…
Browse files Browse the repository at this point in the history
…uller
  • Loading branch information
tinwelint committed Oct 8, 2014
1 parent 4a557cc commit 6935266
Show file tree
Hide file tree
Showing 31 changed files with 622 additions and 412 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ public static void createLogFileForNextVersionWithSomeDataInIt( File store, File

try
{
TransactionAppender appender = new PhysicalTransactionAppender( logFile, positionCache, null, null );
TransactionAppender appender = new PhysicalTransactionAppender( logFile, positionCache,
transactionIdStore, null );
appender.append( singleNodeTransaction() );
}
finally
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryStart;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryWriterv1;

/**
* Serialized {@link CommittedTransactionRepresentation transactions} to raw bytes on the {@link ChannelBuffer network}.
* One serializer can be instantiated per response and is able to serialize one or many transactions.
*/
public class CommittedTransactionSerializer implements Visitor<CommittedTransactionRepresentation, IOException>
{
private final NetworkWritableLogChannel channel;
Expand Down
48 changes: 26 additions & 22 deletions enterprise/com/src/main/java/org/neo4j/com/Protocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,32 +90,36 @@ public <PAYLOAD> Response<PAYLOAD> deserializeResponse( BlockingReadHandler<Chan
PAYLOAD response = payloadDeserializer.read( dechunkingBuffer, input );
StoreId storeId = readStoreId( dechunkingBuffer, input );

byte firstByte = dechunkingBuffer.readByte();
if ( firstByte != -1 )
{ // It's a transaction stream in this response
TransactionStream transactions = new TransactionStream()
// Response type is what previously was a byte saying how many data sources there were in the
// coming transaction stream response. For backwards compatibility we keep it as a byte and we introduce
// the transaction obligation response type as -1
byte responseType = dechunkingBuffer.readByte();
if ( responseType == TransactionObligationResponse.RESPONSE_TYPE )
{
// It is a transaction obligation response
long obligationTxId = dechunkingBuffer.readLong();
return new TransactionObligationResponse<>( response, storeId, obligationTxId, channelReleaser );
}

// It's a transaction stream in this response
TransactionStream transactions = new TransactionStream()
{
@Override
public void accept( Visitor<CommittedTransactionRepresentation, IOException> visitor ) throws IOException
{
@Override
public void accept( Visitor<CommittedTransactionRepresentation, IOException> visitor ) throws IOException
LogEntryReader<ReadableLogChannel> reader = new LogEntryReaderFactory().create();
NetworkReadableLogChannel channel = new NetworkReadableLogChannel( dechunkingBuffer );

try ( PhysicalTransactionCursor<ReadableLogChannel> cursor =
new PhysicalTransactionCursor<>( channel, reader ) )
{
LogEntryReader<ReadableLogChannel> reader = new LogEntryReaderFactory().create();
NetworkReadableLogChannel channel = new NetworkReadableLogChannel( dechunkingBuffer );

try ( PhysicalTransactionCursor<ReadableLogChannel> cursor =
new PhysicalTransactionCursor<>( channel, reader ) )
{
while ( cursor.next() && visitor.visit( cursor.get() ) )
{ // Plow through it
}
while ( cursor.next() && visitor.visit( cursor.get() ) )
{ // Plow through it
}
}
};
return new TransactionStreamResponse<>( response, storeId, transactions, channelReleaser );
}

// It is a transaction obligation response
long obligationTxId = dechunkingBuffer.readLong();
return new TransactionObligationResponse<>( response, storeId, obligationTxId, channelReleaser );
}
};
return new TransactionStreamResponse<>( response, storeId, transactions, channelReleaser );
}

protected abstract StoreId readStoreId( ChannelBuffer source, ByteBuffer byteBuffer );
Expand Down
16 changes: 13 additions & 3 deletions enterprise/com/src/main/java/org/neo4j/com/Response.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@
import org.neo4j.kernel.impl.store.StoreId;
import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation;

/**
* In response to a {@link Client#sendRequest(RequestType, RequestContext, Serializer, Deserializer) request}
* which contains a response value (T), and optionally some sort of side-effect,
* like {@link TransactionStreamResponse transaction stream} or {@link TransactionObligationResponse transaction oglibation}.
*/
public abstract class Response<T> implements AutoCloseable
{
private final T response;
Expand Down Expand Up @@ -74,15 +79,20 @@ public static <T> Response<T> empty()
*/
public interface Handler
{
/**
* Called for responses that handle {@link TransactionObligationResponse transaction obligations}
* after the obligation transaction id has been deserialized.
* @param txId the obligation transaction id that must be fulfilled.
* @throws IOException if there were any problems fulfilling that obligation.
*/
void obligation( long txId ) throws IOException;

/**
* Transaction stream is starting, containing the following data sources.
* Only called if there are at least one transaction in the coming transaction stream.
* @return a {@link Visitor} which will {@link Visitor#visit(Object) receive} calls about transactions.
*/
Visitor<CommittedTransactionRepresentation,IOException> transactions();
}

public static final Response<Void> EMPTY = new TransactionObligationResponse<Void>( null, StoreId.DEFAULT,
public static final Response<Void> EMPTY = new TransactionObligationResponse<>( null, StoreId.DEFAULT,
-1, ResourceReleaser.NO_OP );
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,18 @@

import java.io.IOException;

import org.neo4j.com.storecopy.TransactionObligationFulfiller;
import org.neo4j.kernel.impl.store.StoreId;

/**
* {@link Response} that carries transaction obligation as a side-effect.
*
* @see TransactionObligationFulfiller
*/
public class TransactionObligationResponse<T> extends Response<T>
{
public static final byte RESPONSE_TYPE = -1;

private final long obligationTxId;

public TransactionObligationResponse( T response, StoreId storeId, long obligationTxId, ResourceReleaser releaser )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,19 @@

import java.io.IOException;

import org.neo4j.com.storecopy.ResponseUnpacker;
import org.neo4j.kernel.impl.store.StoreId;

/**
* {@link Response} that carries {@link TransactionStream transaction data} as a side-effect, to be applied
* before accessing the response value.
*
* @see ResponseUnpacker
*/
public class TransactionStreamResponse<T> extends Response<T>
{
public static final byte RESPONSE_TYPE = 0;

private final TransactionStream transactions;

public TransactionStreamResponse( T response, StoreId storeId, TransactionStream transactions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.io.IOException;

import org.neo4j.com.Response;
import org.neo4j.com.TransactionStream;
import org.neo4j.com.TransactionStreamResponse;
import org.neo4j.com.storecopy.TransactionQueue.TransactionVisitor;
import org.neo4j.graphdb.DependencyResolver;
import org.neo4j.helpers.collection.Visitor;
Expand All @@ -34,6 +36,15 @@
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.lifecycle.Lifecycle;

/**
* Receives and unpacks {@link Response responses}.
* Transaction obligations are handled by {@link TransactionObligationFulfiller} and
* {@link TransactionStream transaction streams} are {@link TransactionRepresentationStoreApplier applied to the store},
* in batches.
*
* It is assumed that any {@link TransactionStreamResponse response carrying transaction data} comes from the one
* and same thread.
*/
public class TransactionCommittingResponseUnpacker implements ResponseUnpacker, Lifecycle
{
private static final int DEFAULT_BATCH_SIZE = 30;
Expand All @@ -58,7 +69,7 @@ public void obligation( long txId ) throws IOException

try
{
obligationFulfiller.pullUpdates( txId );
obligationFulfiller.fulfill( txId );
}
catch ( InterruptedException e )
{
Expand Down Expand Up @@ -192,7 +203,7 @@ private static TransactionObligationFulfiller resolveTransactionObligationFulfil
return new TransactionObligationFulfiller()
{
@Override
public void pullUpdates( long toTxId )
public void fulfill( long toTxId )
{
throw new UnsupportedOperationException( "Should not be called" );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@
*/
package org.neo4j.com.storecopy;

/**
* Fulfills transaction obligations, i.e. ensures that the database has committed and applied a particular
* transaction id.
*/
public interface TransactionObligationFulfiller
{
void pullUpdates( final long toTxId ) throws InterruptedException;
void fulfill( long toTxId ) throws InterruptedException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ public void shouldAwaitTransactionObligationsToBeFulfilled() throws Throwable
unpacker.unpackResponse( new DummyObligationResponse( 4 ), NO_OP_TX_HANDLER );

// THEN
verify( obligationFulfiller, times( 1 ) ).pullUpdates( 4l );
verify( obligationFulfiller, times( 1 ) ).fulfill( 4l );
}

private static class StoppingTxHandler implements ResponseUnpacker.TxHandler
Expand Down Expand Up @@ -281,7 +281,7 @@ public ControlledObligationFulfuller( DoubleLatch latch )
}

@Override
public void pullUpdates( long toTxId ) throws InterruptedException
public void fulfill( long toTxId ) throws InterruptedException
{
latch.startAndAwaitFinish();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,9 @@ protected void create()
masterDelegateInvocationHandler = new DelegateInvocationHandler<>( Master.class );
master = (Master) Proxy.newProxyInstance( Master.class.getClassLoader(), new Class[]{Master.class},
masterDelegateInvocationHandler );
final int serverId = config.get( ClusterSettings.server_id ).toIntegerIndex();
requestContextFactory = dependencies.satisfyDependency(new RequestContextFactory( serverId, getDependencyResolver() ));
InstanceId serverId = config.get( ClusterSettings.server_id );
requestContextFactory = dependencies.satisfyDependency(new RequestContextFactory( serverId.toIntegerIndex(),
getDependencyResolver() ));

this.responseUnpacker = dependencies.satisfyDependency(
new TransactionCommittingResponseUnpacker( getDependencyResolver() ) );
Expand All @@ -180,8 +181,12 @@ public KernelAPI instance()

life.add( responseUnpacker );

dependencies.satisfyDependency( life.add( new UpdatePuller( memberStateMachine, master, requestContextFactory,
availabilityGuard, lastUpdateTime, config, jobScheduler, msgLog, dependencyResolver ) ) );
UpdatePuller updatePuller = life.add( new UpdatePuller( memberStateMachine, availabilityGuard,
requestContextFactory, master, lastUpdateTime, logging, serverId ) );
dependencies.satisfyDependency( life.add( new UpdatePullerClient( config.get( HaSettings.pull_interval ),
jobScheduler, logging, updatePuller ) ) );
dependencies.satisfyDependency( life.add( new UpdatePullingTransactionObligationFulfiller(
updatePuller, memberStateMachine, serverId, dependencies ) ) );

stateSwitchTimeoutMillis = config.get( HaSettings.state_switch_timeout );

Expand Down
Loading

0 comments on commit 6935266

Please sign in to comment.