From ff849d073723f08d1607d40c78361b3a40709cd8 Mon Sep 17 00:00:00 2001 From: eson <474420502@qq.com> Date: Mon, 30 May 2022 01:03:03 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=B9=E7=94=A8java?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 29 +++++- proto_grpc.sh | 4 +- .../java/com/yuandian/dataflow/Server.java | 93 ++++++++++++++++++- .../dataflow/statemachine/RaftClosure.java | 21 +++++ 4 files changed, 141 insertions(+), 6 deletions(-) create mode 100644 src/main/java/com/yuandian/dataflow/statemachine/RaftClosure.java diff --git a/pom.xml b/pom.xml index b86d1a2..196f82c 100644 --- a/pom.xml +++ b/pom.xml @@ -18,6 +18,7 @@ 3.20.1 1.7.4 1.3.2 + 2.3.0 1.46.0 1.7.36 1.3.10 @@ -43,6 +44,30 @@ ${jraft.version} + + + org.apache.ratis + ratis-common + ${ratis.version} + + + + org.apache.ratis + ratis-server + ${ratis.version} + + + + + org.apache.ratis + ratis-proto + ${ratis.version} + + + + + + @@ -125,7 +150,7 @@ io.grpc:protoc-gen-grpc-java:1.46.0:exe:${os.detected.classifier} - + diff --git a/proto_grpc.sh b/proto_grpc.sh index 62518a3..81013b0 100755 --- a/proto_grpc.sh +++ b/proto_grpc.sh @@ -4,4 +4,6 @@ # for PROTOFILE in `find ./src -name "*.proto"` # do # protoc $PROTOFILE --plugin=/home/eson/workspace/dataflow/target/protoc-plugins --java_out=$OUTPUT_FILE --grpc-java_out=$OUTPUT_FILE -# done \ No newline at end of file +# done + +ln -sf ./target/gener \ No newline at end of file diff --git a/src/main/java/com/yuandian/dataflow/Server.java b/src/main/java/com/yuandian/dataflow/Server.java index 3f351ac..054c176 100644 --- a/src/main/java/com/yuandian/dataflow/Server.java +++ b/src/main/java/com/yuandian/dataflow/Server.java @@ -2,6 +2,41 @@ package com.yuandian.dataflow; import com.yuandian.dataflow.rpc.DataFlowGrpc; import com.yuandian.dataflow.rpc.DataFlowGrpc.DataFlowImplBase; +import com.yuandian.dataflow.rpc.Scheduler.Response; +import com.yuandian.dataflow.rpc.Scheduler.State; +import com.yuandian.dataflow.statemachine.RaftClosure; +import com.yuandian.dataflow.statemachine.StateMachine; + +import java.nio.ByteBuffer; + +import com.alipay.sofa.jraft.Closure; +import com.alipay.sofa.jraft.Iterator; +import com.alipay.sofa.jraft.JRaftUtils; +import com.alipay.sofa.jraft.Node; +import com.alipay.sofa.jraft.NodeManager; +import com.alipay.sofa.jraft.RaftGroupService; +import com.alipay.sofa.jraft.RaftServiceFactory; +import com.alipay.sofa.jraft.conf.Configuration; +import com.alipay.sofa.jraft.core.IteratorImpl; +import com.alipay.sofa.jraft.core.NodeImpl; +import com.alipay.sofa.jraft.core.ReplicatorGroupImpl; +import com.alipay.sofa.jraft.entity.NodeId; +import com.alipay.sofa.jraft.entity.PeerId; +import com.alipay.sofa.jraft.entity.Task; +import com.alipay.sofa.jraft.option.NodeOptions; +import com.alipay.sofa.jraft.option.RaftOptions; +import com.alipay.sofa.jraft.rpc.CliClientService; +import com.alipay.sofa.jraft.rpc.RaftRpcServerFactory; +import com.alipay.sofa.jraft.rpc.RpcClient; +import com.alipay.sofa.jraft.rpc.RpcServer; +import com.alipay.sofa.jraft.rpc.impl.BoltRpcServer; +import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory; +import com.alipay.sofa.jraft.rpc.impl.BoltRpcClient; +import com.alipay.sofa.jraft.util.Endpoint; + +import com.alipay.sofa.jraft.option.ReplicatorGroupOptions; + +import io.grpc.stub.StreamObserver; /** * Hello world! @@ -10,13 +45,65 @@ import com.yuandian.dataflow.rpc.DataFlowGrpc.DataFlowImplBase; public class Server { public class ServerImpl extends DataFlowImplBase { - + + @Override + public void update(State request, StreamObserver responseObserver) { + // TODO Auto-generated method stub + super.update(request, responseObserver); + + + } + } public static void main( String[] args ) { - - System.out.println( "Hello World!" ); + + String groupId = "jraft"; + // Endpoint addr = new Endpoint("localhost", 8080); + // String s = addr.toString(); // 结果为 localhost:8080 + // PeerId peer = new PeerId(); + // boolean success = peer.parse(s); + Endpoint addr = JRaftUtils.getEndPoint("localhost:8080"); + PeerId serverId = JRaftUtils.getPeerId("localhost:8080"); + Configuration conf = JRaftUtils.getConfiguration("localhost:8081,localhost:8080"); + + // //IteratorImpl it = new IteratorImpl(); + Closure done = new RaftClosure(); + Task task = new Task(); + task.setData(ByteBuffer.wrap("Hello! This is a sample message to test raft..".getBytes())); + task.setDone(done); + + NodeOptions opts = new NodeOptions(); + opts.setElectionTimeoutMs(1000); + opts.setLogUri("logs"); + opts.setRaftMetaUri("raftMeta"); + opts.setSnapshotUri("snapshots"); + opts.setFsm(new StateMachine()); + opts.setInitialConf(conf); + NodeImpl node = (NodeImpl) RaftServiceFactory.createRaftNode(groupId, serverId); + NodeId nodeId = node.getNodeId(); + NodeManager.getInstance().addAddress(serverId.getEndpoint()); + BoltRpcServer rpcServer = (BoltRpcServer) RaftRpcServerFactory.createAndStartRaftRpcServer(serverId.getEndpoint()); + RaftGroupService cluster = new RaftGroupService(groupId, serverId, opts); + //RpcServer rpcServer2 = cluster.getRpcServer(); + //Node node2 = cluster.start(); + + BoltRaftRpcFactory boltRaftRpcFactory = new BoltRaftRpcFactory(); + RpcClient boltRpcClient = boltRaftRpcFactory.createRpcClient(); + //boltRpcClient.registerConnectEventListener(null); + //boltRpcClient.invokeSync(addr, s, 0) + ReplicatorGroupOptions replicatorGroupOptions = new ReplicatorGroupOptions(); + RaftOptions raftOptions = new RaftOptions(); + replicatorGroupOptions.setRaftOptions(raftOptions); + replicatorGroupOptions.setNode(node); + + ReplicatorGroupImpl replicatorGroupImpl = new ReplicatorGroupImpl(); + replicatorGroupImpl.init(nodeId, replicatorGroupOptions); + + boltRpcClient.registerConnectEventListener(replicatorGroupImpl); + + } } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/RaftClosure.java b/src/main/java/com/yuandian/dataflow/statemachine/RaftClosure.java new file mode 100644 index 0000000..c2a38d7 --- /dev/null +++ b/src/main/java/com/yuandian/dataflow/statemachine/RaftClosure.java @@ -0,0 +1,21 @@ +package com.yuandian.dataflow.statemachine; + +import com.alipay.sofa.jraft.Closure; +import com.alipay.sofa.jraft.Status; + +public class RaftClosure implements Closure { + + @Override + public void run(Status status) { + + System.out.println("Task completed with status"+status.getCode()); + System.out.println("Task completed with "+status.getErrorMsg()); + System.out.println("Task completed with "+status.getRaftError()); + } + +// @Override +// public void onCommitted() { +// System.out.println("Task onCommitted"); +// } + +}