diff --git a/pom.xml b/pom.xml
index b86d1a2..196f82c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -18,6 +18,7 @@
3.20.1
1.7.4
1.3.2
+ 2.3.0
1.46.0
1.7.36
1.3.10
@@ -43,6 +44,30 @@
${jraft.version}
+
+
+ org.apache.ratis
+ ratis-common
+ ${ratis.version}
+
+
+
+ org.apache.ratis
+ ratis-server
+ ${ratis.version}
+
+
+
+
+ org.apache.ratis
+ ratis-proto
+ ${ratis.version}
+
+
+
+
+
+
@@ -125,7 +150,7 @@
io.grpc:protoc-gen-grpc-java:1.46.0:exe:${os.detected.classifier}
-
+
diff --git a/proto_grpc.sh b/proto_grpc.sh
index 62518a3..81013b0 100755
--- a/proto_grpc.sh
+++ b/proto_grpc.sh
@@ -4,4 +4,6 @@
# for PROTOFILE in `find ./src -name "*.proto"`
# do
# protoc $PROTOFILE --plugin=/home/eson/workspace/dataflow/target/protoc-plugins --java_out=$OUTPUT_FILE --grpc-java_out=$OUTPUT_FILE
-# done
\ No newline at end of file
+# done
+
+ln -sf ./target/gener
\ No newline at end of file
diff --git a/src/main/java/com/yuandian/dataflow/Server.java b/src/main/java/com/yuandian/dataflow/Server.java
index 3f351ac..054c176 100644
--- a/src/main/java/com/yuandian/dataflow/Server.java
+++ b/src/main/java/com/yuandian/dataflow/Server.java
@@ -2,6 +2,41 @@ package com.yuandian.dataflow;
import com.yuandian.dataflow.rpc.DataFlowGrpc;
import com.yuandian.dataflow.rpc.DataFlowGrpc.DataFlowImplBase;
+import com.yuandian.dataflow.rpc.Scheduler.Response;
+import com.yuandian.dataflow.rpc.Scheduler.State;
+import com.yuandian.dataflow.statemachine.RaftClosure;
+import com.yuandian.dataflow.statemachine.StateMachine;
+
+import java.nio.ByteBuffer;
+
+import com.alipay.sofa.jraft.Closure;
+import com.alipay.sofa.jraft.Iterator;
+import com.alipay.sofa.jraft.JRaftUtils;
+import com.alipay.sofa.jraft.Node;
+import com.alipay.sofa.jraft.NodeManager;
+import com.alipay.sofa.jraft.RaftGroupService;
+import com.alipay.sofa.jraft.RaftServiceFactory;
+import com.alipay.sofa.jraft.conf.Configuration;
+import com.alipay.sofa.jraft.core.IteratorImpl;
+import com.alipay.sofa.jraft.core.NodeImpl;
+import com.alipay.sofa.jraft.core.ReplicatorGroupImpl;
+import com.alipay.sofa.jraft.entity.NodeId;
+import com.alipay.sofa.jraft.entity.PeerId;
+import com.alipay.sofa.jraft.entity.Task;
+import com.alipay.sofa.jraft.option.NodeOptions;
+import com.alipay.sofa.jraft.option.RaftOptions;
+import com.alipay.sofa.jraft.rpc.CliClientService;
+import com.alipay.sofa.jraft.rpc.RaftRpcServerFactory;
+import com.alipay.sofa.jraft.rpc.RpcClient;
+import com.alipay.sofa.jraft.rpc.RpcServer;
+import com.alipay.sofa.jraft.rpc.impl.BoltRpcServer;
+import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory;
+import com.alipay.sofa.jraft.rpc.impl.BoltRpcClient;
+import com.alipay.sofa.jraft.util.Endpoint;
+
+import com.alipay.sofa.jraft.option.ReplicatorGroupOptions;
+
+import io.grpc.stub.StreamObserver;
/**
* Hello world!
@@ -10,13 +45,65 @@ import com.yuandian.dataflow.rpc.DataFlowGrpc.DataFlowImplBase;
public class Server
{
public class ServerImpl extends DataFlowImplBase {
-
+
+ @Override
+ public void update(State request, StreamObserver responseObserver) {
+ // TODO Auto-generated method stub
+ super.update(request, responseObserver);
+
+
+ }
+
}
public static void main( String[] args )
{
-
- System.out.println( "Hello World!" );
+
+ String groupId = "jraft";
+ // Endpoint addr = new Endpoint("localhost", 8080);
+ // String s = addr.toString(); // 结果为 localhost:8080
+ // PeerId peer = new PeerId();
+ // boolean success = peer.parse(s);
+ Endpoint addr = JRaftUtils.getEndPoint("localhost:8080");
+ PeerId serverId = JRaftUtils.getPeerId("localhost:8080");
+ Configuration conf = JRaftUtils.getConfiguration("localhost:8081,localhost:8080");
+
+ // //IteratorImpl it = new IteratorImpl();
+ Closure done = new RaftClosure();
+ Task task = new Task();
+ task.setData(ByteBuffer.wrap("Hello! This is a sample message to test raft..".getBytes()));
+ task.setDone(done);
+
+ NodeOptions opts = new NodeOptions();
+ opts.setElectionTimeoutMs(1000);
+ opts.setLogUri("logs");
+ opts.setRaftMetaUri("raftMeta");
+ opts.setSnapshotUri("snapshots");
+ opts.setFsm(new StateMachine());
+ opts.setInitialConf(conf);
+ NodeImpl node = (NodeImpl) RaftServiceFactory.createRaftNode(groupId, serverId);
+ NodeId nodeId = node.getNodeId();
+ NodeManager.getInstance().addAddress(serverId.getEndpoint());
+ BoltRpcServer rpcServer = (BoltRpcServer) RaftRpcServerFactory.createAndStartRaftRpcServer(serverId.getEndpoint());
+ RaftGroupService cluster = new RaftGroupService(groupId, serverId, opts);
+ //RpcServer rpcServer2 = cluster.getRpcServer();
+ //Node node2 = cluster.start();
+
+ BoltRaftRpcFactory boltRaftRpcFactory = new BoltRaftRpcFactory();
+ RpcClient boltRpcClient = boltRaftRpcFactory.createRpcClient();
+ //boltRpcClient.registerConnectEventListener(null);
+ //boltRpcClient.invokeSync(addr, s, 0)
+ ReplicatorGroupOptions replicatorGroupOptions = new ReplicatorGroupOptions();
+ RaftOptions raftOptions = new RaftOptions();
+ replicatorGroupOptions.setRaftOptions(raftOptions);
+ replicatorGroupOptions.setNode(node);
+
+ ReplicatorGroupImpl replicatorGroupImpl = new ReplicatorGroupImpl();
+ replicatorGroupImpl.init(nodeId, replicatorGroupOptions);
+
+ boltRpcClient.registerConnectEventListener(replicatorGroupImpl);
+
+
}
}
diff --git a/src/main/java/com/yuandian/dataflow/statemachine/RaftClosure.java b/src/main/java/com/yuandian/dataflow/statemachine/RaftClosure.java
new file mode 100644
index 0000000..c2a38d7
--- /dev/null
+++ b/src/main/java/com/yuandian/dataflow/statemachine/RaftClosure.java
@@ -0,0 +1,21 @@
+package com.yuandian.dataflow.statemachine;
+
+import com.alipay.sofa.jraft.Closure;
+import com.alipay.sofa.jraft.Status;
+
+public class RaftClosure implements Closure {
+
+ @Override
+ public void run(Status status) {
+
+ System.out.println("Task completed with status"+status.getCode());
+ System.out.println("Task completed with "+status.getErrorMsg());
+ System.out.println("Task completed with "+status.getRaftError());
+ }
+
+// @Override
+// public void onCommitted() {
+// System.out.println("Task onCommitted");
+// }
+
+}