diff --git a/pom.xml b/pom.xml index 50e453c..b86d1a2 100644 --- a/pom.xml +++ b/pom.xml @@ -1,90 +1,105 @@ - - 4.0.0 + + 4.0.0 - com.yuandian.dataflow - dataflow - 1.0-SNAPSHOT - jar + com.yuandian.dataflow + dataflow + 1.0-SNAPSHOT + jar - dataflow - http://maven.apache.org + dataflow + http://maven.apache.org - - UTF-8 - 11 - 11 - 11 + + UTF-8 + 11 + 11 + 11 3.20.1 1.7.4 + 1.3.2 + 1.46.0 + 1.7.36 + 1.3.10 + - 1.45.1 - - - - - junit - junit - 3.8.1 - test - - - - - com.google.protobuf - protobuf-java - ${protobuf.version} - - - - io.protostuff - protostuff-core - ${protostuff.version} - - - - io.protostuff - protostuff-runtime - ${protostuff.version} - - - - javax.annotation - javax.annotation-api - 1.3.2 - + + + junit + junit + 3.8.1 + test - - + + org.slf4j + slf4j-api + ${slf4j.version} + + + + com.alipay.sofa + jraft-core + ${jraft.version} + + + + + + com.google.protobuf + protobuf-java + ${protobuf.version} + + + + io.protostuff + protostuff-core + ${protostuff.version} + + + + io.protostuff + protostuff-runtime + ${protostuff.version} + + + + javax.annotation + javax.annotation-api + ${javax.annotation.version} + + + + + + io.grpc grpc-netty - 1.46.0 + ${grpc.version} io.grpc grpc-protobuf - 1.46.0 + ${grpc.version} io.grpc grpc-stub - 1.46.0 + ${grpc.version} - io.grpc - grpc-all - 1.46.0 - - - - - - + io.grpc + grpc-all + ${grpc.version} + + + + + + + + - - kr.motd.maven @@ -94,28 +109,33 @@ - - kr.motd.maven - os-maven-plugin - 1.6.2 - + + kr.motd.maven + os-maven-plugin + 1.6.2 + org.xolstice.maven.plugins protobuf-maven-plugin 0.6.1 - com.google.protobuf:protoc:3.13.0:exe:${os.detected.classifier} + + grpc-java + com.google.protobuf:protoc:3.13.0:exe:${os.detected.classifier} io.grpc:protoc-gen-grpc-java:1.46.0:exe:${os.detected.classifier} + compile compile-custom + test-compile + test-compile-custom - + \ No newline at end of file diff --git a/proto_grpc.sh b/proto_grpc.sh index c8414bb..62518a3 100755 --- a/proto_grpc.sh +++ b/proto_grpc.sh @@ -1,4 +1,7 @@ #! /bin/bash -OUTPUT_FILE="./src/main/java/" +# OUTPUT_FILE="./src/main/java/" -protoc src/main/java/com/yuandian/dataflow/rpc/*.proto --plugin=protoc-gen-grpc-java --java_out=$OUTPUT_FILE --grpc-java_out=$OUTPUT_FILE \ No newline at end of file +# for PROTOFILE in `find ./src -name "*.proto"` +# do +# protoc $PROTOFILE --plugin=/home/eson/workspace/dataflow/target/protoc-plugins --java_out=$OUTPUT_FILE --grpc-java_out=$OUTPUT_FILE +# done \ No newline at end of file diff --git a/src/main/java/com/yuandian/dataflow/Server.java b/src/main/java/com/yuandian/dataflow/Server.java index e08e165..3f351ac 100644 --- a/src/main/java/com/yuandian/dataflow/Server.java +++ b/src/main/java/com/yuandian/dataflow/Server.java @@ -1,13 +1,22 @@ package com.yuandian.dataflow; +import com.yuandian.dataflow.rpc.DataFlowGrpc; +import com.yuandian.dataflow.rpc.DataFlowGrpc.DataFlowImplBase; + /** * Hello world! * */ public class Server { + public class ServerImpl extends DataFlowImplBase { + + } + + public static void main( String[] args ) { + System.out.println( "Hello World!" ); - } + } } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java new file mode 100644 index 0000000..22fffc2 --- /dev/null +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java @@ -0,0 +1,99 @@ +package com.yuandian.dataflow.statemachine; + + + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicLong; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.alipay.remoting.exception.CodecException; +import com.alipay.remoting.serialization.SerializerManager; +import com.alipay.sofa.jraft.Closure; +import com.alipay.sofa.jraft.Iterator; +import com.alipay.sofa.jraft.Status; +import com.alipay.sofa.jraft.core.StateMachineAdapter; +import com.alipay.sofa.jraft.error.RaftError; +import com.alipay.sofa.jraft.error.RaftException; +import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader; +import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter; +import com.alipay.sofa.jraft.util.Utils; + +/** + * Counter state machine. + * + * @author boyan (boyan@alibaba-inc.com) + * + * 2018-Apr-09 4:52:31 PM + */ +public class StateMachine extends StateMachineAdapter { + + private static final Logger LOG = LoggerFactory.getLogger(StateMachine.class); + + /** + * Counter value + */ + private final AtomicLong value = new AtomicLong(0); + /** + * Leader term + */ + private final AtomicLong leaderTerm = new AtomicLong(-1); + + public boolean isLeader() { + return this.leaderTerm.get() > 0; + } + + /** + * Returns current value. + */ + public long getValue() { + return this.value.get(); + } + + @Override + public void onApply(final Iterator iter) { + while (iter.hasNext()) { + + if (iter.done() != null) { + // This task is applied by this node, get value from closure to avoid additional parsing. + + } else { + // Have to parse FetchAddRequest from this user log. + + } + + iter.next(); + } + } + + @Override + public void onSnapshotSave(final SnapshotWriter writer, final Closure done) { + return; + } + + @Override + public void onError(final RaftException e) { + LOG.error("Raft error: {}", e, e); + } + + @Override + public boolean onSnapshotLoad(final SnapshotReader reader) { + + return true; + } + + @Override + public void onLeaderStart(final long term) { + this.leaderTerm.set(term); + super.onLeaderStart(term); + + } + + @Override + public void onLeaderStop(final Status status) { + this.leaderTerm.set(-1); + super.onLeaderStop(status); + } + +}