From cf4ff63b31da6ee9ee1188d31a4307a8a3e1d06f Mon Sep 17 00:00:00 2001 From: huangsimin <474420502@qq.com> Date: Thu, 4 Aug 2022 18:07:48 +0800 Subject: [PATCH] =?UTF-8?q?ratis=20grpc=E7=89=88=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .vscode/launch.json | 13 +- pom.xml | 53 +++- .../java/com/yuandian/dataflow/Server.java | 2 +- .../dataflow/controller/MasterProcessor.java | 18 +- .../dataflow/controller/PacketsProcessor.java | 14 +- .../controller/TransferLeaderProcessor.java | 4 +- .../dataflow/statemachine/CounterServer.java | 124 ++++++++ .../statemachine/CounterStateMachine.java | 288 ++++++++++++++++++ .../statemachine/client/CounterClient.java | 137 +++++++++ .../MasterFactory.java | 16 +- .../StateFactory.java | 20 +- .../StateMachine.java | 12 +- .../annotations/MasterRegister.java | 2 +- .../annotations/WorkerRegister.java | 2 +- .../closure/GenericClosure.java | 4 +- .../master/MasterContext.java | 2 +- .../master/MasterExecute.java | 2 +- .../operate/Operate.java | 12 +- .../rpc/OperateProcessor.java | 10 +- .../rpc/RaftResponse.java | 2 +- .../state/State.java | 2 +- .../state/WorkerState.java | 2 +- src/main/resources/logback.xml | 2 +- 23 files changed, 674 insertions(+), 69 deletions(-) create mode 100644 src/main/java/com/yuandian/dataflow/statemachine/CounterServer.java create mode 100644 src/main/java/com/yuandian/dataflow/statemachine/CounterStateMachine.java create mode 100644 src/main/java/com/yuandian/dataflow/statemachine/client/CounterClient.java rename src/main/java/com/yuandian/dataflow/{statemachine => statemachine_old}/MasterFactory.java (77%) rename src/main/java/com/yuandian/dataflow/{statemachine => statemachine_old}/StateFactory.java (94%) rename src/main/java/com/yuandian/dataflow/{statemachine => statemachine_old}/StateMachine.java (96%) rename src/main/java/com/yuandian/dataflow/{statemachine => statemachine_old}/annotations/MasterRegister.java (88%) rename src/main/java/com/yuandian/dataflow/{statemachine => statemachine_old}/annotations/WorkerRegister.java (87%) rename src/main/java/com/yuandian/dataflow/{statemachine => statemachine_old}/closure/GenericClosure.java (91%) rename src/main/java/com/yuandian/dataflow/{statemachine => statemachine_old}/master/MasterContext.java (95%) rename src/main/java/com/yuandian/dataflow/{statemachine => statemachine_old}/master/MasterExecute.java (66%) rename src/main/java/com/yuandian/dataflow/{statemachine => statemachine_old}/operate/Operate.java (87%) rename src/main/java/com/yuandian/dataflow/{statemachine => statemachine_old}/rpc/OperateProcessor.java (85%) rename src/main/java/com/yuandian/dataflow/{statemachine => statemachine_old}/rpc/RaftResponse.java (92%) rename src/main/java/com/yuandian/dataflow/{statemachine => statemachine_old}/state/State.java (93%) rename src/main/java/com/yuandian/dataflow/{statemachine => statemachine_old}/state/WorkerState.java (94%) diff --git a/.vscode/launch.json b/.vscode/launch.json index 80b7356..45b0b95 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -4,6 +4,13 @@ // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 "version": "0.2.0", "configurations": [ + { + "type": "java", + "name": "Launch CounterClient", + "request": "launch", + "mainClass": "com.yuandian.dataflow.statemachine.client.CounterClient", + "projectName": "dataflow" + }, { "type": "java", "name": "Launch Utils", @@ -15,7 +22,7 @@ "type": "java", "name": "Raft-0", "request": "launch", - "mainClass": "com.yuandian.dataflow.Server", + "mainClass": "com.yuandian.dataflow.statemachine.CounterServer", "projectName": "dataflow", "console": "integratedTerminal", "args": [ @@ -33,7 +40,7 @@ "type": "java", "name": "Raft-1", "request": "launch", - "mainClass": "com.yuandian.dataflow.Server", + "mainClass": "com.yuandian.dataflow.statemachine.CounterServer", "projectName": "dataflow", "console": "integratedTerminal", "args": [ @@ -51,7 +58,7 @@ "type": "java", "name": "Raft-2", "request": "launch", - "mainClass": "com.yuandian.dataflow.Server", + "mainClass": "com.yuandian.dataflow.statemachine.CounterServer", "projectName": "dataflow", "console": "integratedTerminal", "args": [ diff --git a/pom.xml b/pom.xml index 95a1bb4..e27c845 100644 --- a/pom.xml +++ b/pom.xml @@ -75,6 +75,45 @@ ${snakeyaml.version} + + org.apache.ratis + ratis-common + 2.3.0 + + + + org.apache.ratis + ratis-grpc + 2.3.0 + + + + org.apache.ratis + ratis-server + 2.3.0 + + + + org.apache.ratis + ratis-thirdparty-misc + 1.0.1 + + + + org.apache.ratis + ratis-proto + 2.3.0 + + + + org.apache.ratis + ratis-tools + 2.3.0 + + + + + com.alipay.sofa @@ -106,6 +145,16 @@ ${spring.boot.version} + + + + io.dropwizard.metrics + metrics-core + 3.2.6 + + + + @@ -251,7 +300,7 @@ true lib/ - com.yuandian.dataflow.Server + com.yuandian.dataflow.statemachine.CounterServer @@ -295,7 +344,7 @@ false - com.yuandian.dataflow.Server + com.yuandian.dataflow.statemachine.CounterServer diff --git a/src/main/java/com/yuandian/dataflow/Server.java b/src/main/java/com/yuandian/dataflow/Server.java index 7f34e85..28c322f 100644 --- a/src/main/java/com/yuandian/dataflow/Server.java +++ b/src/main/java/com/yuandian/dataflow/Server.java @@ -9,7 +9,7 @@ import org.slf4j.MarkerFactory; import com.alipay.sofa.jraft.JRaftUtils; import com.alipay.sofa.jraft.conf.Configuration; -import com.yuandian.dataflow.statemachine.StateFactory; +import com.yuandian.dataflow.statemachine_old.StateFactory; import com.yuandian.dataflow.utils.Utils; import io.netty.util.internal.StringUtil; diff --git a/src/main/java/com/yuandian/dataflow/controller/MasterProcessor.java b/src/main/java/com/yuandian/dataflow/controller/MasterProcessor.java index 2479d83..ccdbaf9 100644 --- a/src/main/java/com/yuandian/dataflow/controller/MasterProcessor.java +++ b/src/main/java/com/yuandian/dataflow/controller/MasterProcessor.java @@ -16,15 +16,15 @@ import com.alipay.sofa.jraft.rpc.InvokeCallback; import com.google.protobuf.Any; import com.yuandian.dataflow.controller.PacketsProcessor.PacketsRequest; import com.yuandian.dataflow.proto.msgtype.BacktrackingFlowOuterClass; -import com.yuandian.dataflow.statemachine.StateFactory; -import com.yuandian.dataflow.statemachine.annotations.MasterRegister; -import com.yuandian.dataflow.statemachine.closure.GenericClosure; -import com.yuandian.dataflow.statemachine.master.MasterContext; -import com.yuandian.dataflow.statemachine.master.MasterExecute; -import com.yuandian.dataflow.statemachine.operate.Operate; -import com.yuandian.dataflow.statemachine.operate.Operate.OperateType; -import com.yuandian.dataflow.statemachine.state.State; -import com.yuandian.dataflow.statemachine.state.WorkerState; +import com.yuandian.dataflow.statemachine_old.StateFactory; +import com.yuandian.dataflow.statemachine_old.annotations.MasterRegister; +import com.yuandian.dataflow.statemachine_old.closure.GenericClosure; +import com.yuandian.dataflow.statemachine_old.master.MasterContext; +import com.yuandian.dataflow.statemachine_old.master.MasterExecute; +import com.yuandian.dataflow.statemachine_old.operate.Operate; +import com.yuandian.dataflow.statemachine_old.operate.Operate.OperateType; +import com.yuandian.dataflow.statemachine_old.state.State; +import com.yuandian.dataflow.statemachine_old.state.WorkerState; import com.yuandian.dataflow.utils.PacketsManager; import com.yuandian.dataflow.utils.Utils; diff --git a/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java b/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java index 24d9b9c..638101b 100644 --- a/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java +++ b/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java @@ -17,13 +17,13 @@ import com.alipay.sofa.jraft.Status; import com.alipay.sofa.jraft.rpc.RpcContext; import com.alipay.sofa.jraft.rpc.RpcProcessor; import com.google.protobuf.Any; -import com.yuandian.dataflow.statemachine.StateFactory; -import com.yuandian.dataflow.statemachine.annotations.WorkerRegister; -import com.yuandian.dataflow.statemachine.closure.GenericClosure; -import com.yuandian.dataflow.statemachine.operate.Operate; -import com.yuandian.dataflow.statemachine.operate.Operate.OperateType; -import com.yuandian.dataflow.statemachine.rpc.RaftResponse; -import com.yuandian.dataflow.statemachine.state.State; +import com.yuandian.dataflow.statemachine_old.StateFactory; +import com.yuandian.dataflow.statemachine_old.annotations.WorkerRegister; +import com.yuandian.dataflow.statemachine_old.closure.GenericClosure; +import com.yuandian.dataflow.statemachine_old.operate.Operate; +import com.yuandian.dataflow.statemachine_old.operate.Operate.OperateType; +import com.yuandian.dataflow.statemachine_old.rpc.RaftResponse; +import com.yuandian.dataflow.statemachine_old.state.State; import lombok.Getter; import lombok.Setter; diff --git a/src/main/java/com/yuandian/dataflow/controller/TransferLeaderProcessor.java b/src/main/java/com/yuandian/dataflow/controller/TransferLeaderProcessor.java index 7f40148..4a9a0ab 100644 --- a/src/main/java/com/yuandian/dataflow/controller/TransferLeaderProcessor.java +++ b/src/main/java/com/yuandian/dataflow/controller/TransferLeaderProcessor.java @@ -6,8 +6,8 @@ import com.alipay.sofa.jraft.Status; import com.alipay.sofa.jraft.entity.PeerId; import com.alipay.sofa.jraft.rpc.RpcContext; import com.alipay.sofa.jraft.rpc.RpcProcessor; -import com.yuandian.dataflow.statemachine.StateFactory; -import com.yuandian.dataflow.statemachine.annotations.WorkerRegister; +import com.yuandian.dataflow.statemachine_old.StateFactory; +import com.yuandian.dataflow.statemachine_old.annotations.WorkerRegister; import lombok.Getter; import lombok.Setter; diff --git a/src/main/java/com/yuandian/dataflow/statemachine/CounterServer.java b/src/main/java/com/yuandian/dataflow/statemachine/CounterServer.java new file mode 100644 index 0000000..392ebca --- /dev/null +++ b/src/main/java/com/yuandian/dataflow/statemachine/CounterServer.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.yuandian.dataflow.statemachine; + +import org.apache.ratis.conf.RaftProperties; +// import org.apache.ratis.examples.common.Constants; +import org.apache.ratis.grpc.GrpcConfigKeys; +import org.apache.ratis.protocol.RaftGroup; +import org.apache.ratis.protocol.RaftGroupId; +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.server.RaftServer; +import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.util.NetUtils; + +import io.netty.util.internal.StringUtil; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Scanner; +import java.util.UUID; + +import static java.nio.charset.StandardCharsets.UTF_8; + +/** + * Simplest Ratis server, use a simple state machine {@link CounterStateMachine} + * which maintain a counter across multi server. + * This server application designed to run several times with different + * parameters (1,2 or 3). server addresses hard coded in {@link Constants} + *

+ * Run this application three times with three different parameter set-up a + * ratis cluster which maintain a counter value replicated in each server memory + */ +public final class CounterServer implements Closeable { + private final RaftServer server; + + public static final UUID CLUSTER_GROUP_ID = UUID.fromString("79642d72-6166-742d-6461-7461666c6f77"); + + public CounterServer(RaftPeer peer, ArrayList peers, File storageDir) throws IOException { + //create a property object + RaftProperties properties = new RaftProperties(); + + //set the storage directory (different for each peer) in RaftProperty object + RaftServerConfigKeys.setStorageDir(properties, Collections.singletonList(storageDir)); + + //set the port which server listen to in RaftProperty object + final int port = NetUtils.createSocketAddr(peer.getAddress()).getPort(); + GrpcConfigKeys.Server.setPort(properties, port); + + //create the counter state machine which hold the counter value + CounterStateMachine counterStateMachine = new CounterStateMachine(); + + RaftGroup raftGroup = RaftGroup.valueOf( + RaftGroupId.valueOf(CLUSTER_GROUP_ID), peers); + + //create and start the Raft server + this.server = RaftServer.newBuilder() + .setGroup(raftGroup) + .setProperties(properties) + .setServerId(peer.getId()) + .setStateMachine(counterStateMachine) + .build(); + } + + public void start() throws IOException { + server.start(); + } + + @Override + public void close() throws IOException { + server.close(); + } + + public static void main(String[] args) throws IOException { + if (args.length < 1) { + System.err.println("Usage: java -cp *.jar org.apache.ratis.examples.counter.server.CounterServer {serverIndex}"); + System.err.println("{serverIndex} could be 1, 2 or 3"); + System.exit(1); + } + + var peers = new ArrayList(); + String[] addresses = new String[]{"localhost:4440","localhost:4441","localhost:4442"}; + + for (int i = 0; i < addresses.length; i++) { + var port = addresses[i].split(":")[1]; + peers.add(RaftPeer.newBuilder().setId("" + port).setAddress(addresses[i]).build()); + } + + //find current peer object based on application parameter + final RaftPeer currentPeer = peers.get(Integer.parseInt(args[0])); + + //start a counter server + final File storageDir = new File("./raftdata/" + currentPeer.getId()); + final CounterServer counterServer = new CounterServer(currentPeer, peers, storageDir); + + counterServer.start(); + + + + + //exit when any input entered + Scanner scanner = new Scanner(System.in, UTF_8.name()); + scanner.nextLine(); + counterServer.close(); + } +} diff --git a/src/main/java/com/yuandian/dataflow/statemachine/CounterStateMachine.java b/src/main/java/com/yuandian/dataflow/statemachine/CounterStateMachine.java new file mode 100644 index 0000000..73f1d98 --- /dev/null +++ b/src/main/java/com/yuandian/dataflow/statemachine/CounterStateMachine.java @@ -0,0 +1,288 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.yuandian.dataflow.statemachine; + +import org.apache.ratis.proto.RaftProtos; +import org.apache.ratis.proto.RaftProtos.RoleInfoProto; +import org.apache.ratis.protocol.Message; +import org.apache.ratis.protocol.RaftGroupId; +import org.apache.ratis.protocol.RaftGroupMemberId; +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.server.RaftServer; +import org.apache.ratis.server.protocol.TermIndex; +import org.apache.ratis.server.raftlog.RaftLog; +import org.apache.ratis.server.storage.RaftStorage; +import org.apache.ratis.statemachine.TransactionContext; +import org.apache.ratis.statemachine.impl.BaseStateMachine; +import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage; +import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo; +import org.apache.ratis.util.AutoCloseableLock; +import org.apache.ratis.util.JavaUtils; + +import com.yuandian.dataflow.statemachine_old.operate.Operate; +import com.yuandian.dataflow.statemachine_old.state.State; +import com.yuandian.dataflow.statemachine_old.state.WorkerState; + +import lombok.extern.slf4j.Slf4j; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.nio.charset.Charset; +import java.util.Collection; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * State machine implementation for Counter server application. This class + * maintain a {@link AtomicInteger} object as a state and accept two commands: + * GET and INCREMENT, GET is a ReadOnly command which will be handled by + * {@code query} method however INCREMENT is a transactional command which + * will be handled by {@code applyTransaction}. + */ +@Slf4j +public class CounterStateMachine extends BaseStateMachine { + private final SimpleStateMachineStorage storage = + new SimpleStateMachineStorage(); + + + private State state = new State(); + + private AtomicInteger counter = new AtomicInteger(0); + + private AtomicBoolean leader = new AtomicBoolean(false); + + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); + + private AutoCloseableLock readLock() { + return AutoCloseableLock.acquire(lock.readLock()); + } + + private AutoCloseableLock writeLock() { + return AutoCloseableLock.acquire(lock.writeLock()); + } + + public Boolean isLeader() { + return leader.get(); + } + + /** + * initialize the state machine by initilize the state machine storage and + * calling the load method which reads the last applied command and restore it + * in counter object) + * + * @param server the current server information + * @param groupId the cluster groupId + * @param raftStorage the raft storage which is used to keep raft related + * stuff + * @throws IOException if any error happens during load state + */ + @Override + public void initialize(RaftServer server, RaftGroupId groupId, + RaftStorage raftStorage) throws IOException { + super.initialize(server, groupId, raftStorage); + this.storage.init(raftStorage); + load(storage.getLatestSnapshot()); + } + + /** + * very similar to initialize method, but doesn't initialize the storage + * system because the state machine reinitialized from the PAUSE state and + * storage system initialized before. + * + * @throws IOException if any error happens during load state + */ + @Override + public void reinitialize() throws IOException { + load(storage.getLatestSnapshot()); + } + + /** + * Store the current state as an snapshot file in the stateMachineStorage. + * + * @return the index of the snapshot + */ + @Override + public long takeSnapshot() { + //get the last applied index + final TermIndex last = getLastAppliedTermIndex(); + + //create a file with a proper name to store the snapshot + final File snapshotFile = + storage.getSnapshotFile(last.getTerm(), last.getIndex()); + + //serialize the counter object and write it into the snapshot file + try (ObjectOutputStream out = new ObjectOutputStream( + new BufferedOutputStream(new FileOutputStream(snapshotFile)))) { + out.writeObject(counter); + } catch (IOException ioe) { + LOG.warn("Failed to write snapshot file \"" + snapshotFile + + "\", last applied index=" + last); + } + + //return the index of the stored snapshot (which is the last applied one) + return last.getIndex(); + } + + /** + * Load the state of the state machine from the storage. + * + * @param snapshot to load + * @return the index of the snapshot or -1 if snapshot is invalid + * @throws IOException if any error happens during read from storage + */ + private long load(SingleFileSnapshotInfo snapshot) throws IOException { + //check the snapshot nullity + if (snapshot == null) { + LOG.warn("The snapshot info is null."); + return RaftLog.INVALID_LOG_INDEX; + } + + //check the existance of the snapshot file + final File snapshotFile = snapshot.getFile().getPath().toFile(); + if (!snapshotFile.exists()) { + LOG.warn("The snapshot file {} does not exist for snapshot {}", + snapshotFile, snapshot); + return RaftLog.INVALID_LOG_INDEX; + } + + //load the TermIndex object for the snapshot using the file name pattern of + // the snapshot + final TermIndex last = + SimpleStateMachineStorage.getTermIndexFromSnapshotFile(snapshotFile); + + //read the file and cast it to the AtomicInteger and set the counter + try (ObjectInputStream in = new ObjectInputStream( + new BufferedInputStream(new FileInputStream(snapshotFile)))) { + //set the last applied termIndex to the termIndex of the snapshot + setLastAppliedTermIndex(last); + + //read, cast and set the counter + counter = JavaUtils.cast(in.readObject()); + } catch (ClassNotFoundException e) { + throw new IllegalStateException(e); + } + + return last.getIndex(); + } + + + @Override + public void notifyLeaderChanged(RaftGroupMemberId groupMemberId, RaftPeerId newLeaderId) { + log.info("newLeaderId: {} groupMemberId: {}", newLeaderId , groupMemberId.getPeerId()); + leader.set(newLeaderId == groupMemberId.getPeerId()); + super.notifyLeaderChanged(groupMemberId, newLeaderId); + } + + + + + /** + * Handle GET command, which used by clients to get the counter value. + * + * @param request the GET message + * @return the Message containing the current counter value + */ + @Override + public CompletableFuture query(Message request) { + String msg = request.getContent().toString(Charset.defaultCharset()); + + if (!msg.equals("GET")) { + return CompletableFuture.completedFuture( + Message.valueOf("Invalid Command")); + } + + return CompletableFuture.completedFuture( + Message.valueOf(counter.toString())); + + } + + /** + * Apply the INCREMENT command by incrementing the counter object. + * + * @param trx the transaction context + * @return the message containing the updated counter value + */ + @Override + public CompletableFuture applyTransaction(TransactionContext trx) { + final RaftProtos.LogEntryProto entry = trx.getLogEntry(); + + //check if the command is valid + // String logData = entry.getStateMachineLogEntry().getLogData() + // .toString(Charset.defaultCharset()); + + Operate op ; + try { + op = (Operate)new ObjectInputStream(entry.getStateMachineLogEntry().getLogData().newInput()).readObject(); + } catch (IOException | ClassNotFoundException e) { + e.printStackTrace(); + return CompletableFuture.completedFuture(Message.valueOf("错误op")); + } + + // if (!logData.equals("INCREMENT")) { + // return CompletableFuture.completedFuture( + // Message.valueOf("Invalid Command")); + // } + //update the last applied term and index + final long index = entry.getIndex(); + + try(var r = writeLock()) { + switch(op.getType()) { + case ALLOCATE_PACKETS: + break; + case GET_STATE: + break; + case PUT_WORKERSTATE: + var ws = op.getValue(); + state.getWorkers().put(ws.getPeerId() , ws); + break; + case REMOVE: + break; + default: + break; + + } + updateLastAppliedTermIndex(entry.getTerm(), entry.getIndex()); + } + + + //actual execution of the command: increment the counter + // counter.incrementAndGet(); + + //return the new value of the counter to the client + final CompletableFuture f = + CompletableFuture.completedFuture(Message.valueOf("put ok")); + + //if leader, log the incremented value and it's log index + if (isLeader()) { + log.info("{}: Increment to {}", index, counter.toString()); + } + + + return f; + } +} diff --git a/src/main/java/com/yuandian/dataflow/statemachine/client/CounterClient.java b/src/main/java/com/yuandian/dataflow/statemachine/client/CounterClient.java new file mode 100644 index 0000000..a639c27 --- /dev/null +++ b/src/main/java/com/yuandian/dataflow/statemachine/client/CounterClient.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.yuandian.dataflow.statemachine.client; + + + +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.conf.Parameters; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.grpc.GrpcConfigKeys; +import org.apache.ratis.grpc.GrpcFactory; +import org.apache.ratis.protocol.ClientId; +import org.apache.ratis.protocol.Message; +import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.protocol.RaftGroup; +import org.apache.ratis.protocol.RaftGroupId; +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import org.apache.ratis.util.NetUtils; +import org.springframework.cglib.proxy.CallbackFilter; + +import com.yuandian.dataflow.statemachine.CounterServer; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +/** + * Counter client application, this application sends specific number of + * INCREMENT command to the Counter cluster and at the end sends a GET command + * and print the result + *

+ * Parameter to this application indicate the number of INCREMENT command, if no + * parameter found, application use default value which is 10 + */ +@Slf4j +public final class CounterClient { + + private CounterClient(){ + } + + + public static void main(String[] args) + throws IOException, InterruptedException { + //indicate the number of INCREMENT command, set 10 if no parameter passed + int increment = args.length > 0 ? Integer.parseInt(args[0]) : 10; + + //build the counter cluster client + RaftClient raftClient = buildClient(); + + //use a executor service with 10 thread to send INCREMENT commands + // concurrently + ExecutorService executorService = Executors.newFixedThreadPool(10); + + increment = 1000; + CountDownLatch latch = new CountDownLatch(increment); + //send INCREMENT commands concurrently + System.out.printf("Sending %d increment command...%n", increment); + Instant now = Instant.now(); + for (int i = 0; i < increment; i++) { + executorService.submit(() -> + raftClient.io().send(Message.valueOf("INCREMENT"))); + latch.countDown(); + } + + + //shutdown the executor service and wait until they finish their work + executorService.shutdown(); + executorService.awaitTermination(increment * 500L, TimeUnit.MILLISECONDS); + latch.await(); + log.info("{}", Duration.between(now, Instant.now()).toMillis()); + //send GET command and print the response + + + RaftClientReply count = raftClient.io().sendReadOnly(Message.valueOf("GET")); + String response = count.getMessage().getContent().toString(Charset.defaultCharset()); + System.out.println(response); + + } + + /** + * build the RaftClient instance which is used to communicate to + * Counter cluster + * + * @return the created client of Counter cluster + */ + private static RaftClient buildClient() { + RaftProperties raftProperties = new RaftProperties(); + //set the storage directory (different for each peer) in RaftProperty object + + + //set the port which server listen to in RaftProperty object + // final int port = NetUtils.createSocketAddr("localhost:4440").getPort(); + // GrpcConfigKeys.Server.setPort(raftProperties, port); + + var peers = new ArrayList(); + String[] addresses = new String[]{"localhost:4440","localhost:4441","localhost:4442"}; + + for (int i = 0; i < addresses.length; i++) { + var port = addresses[i].split(":")[1]; + peers.add(RaftPeer.newBuilder().setId("" + port).setAddress(addresses[i]).build()); + } + RaftGroup raftGroup = RaftGroup.valueOf( + RaftGroupId.valueOf(CounterServer.CLUSTER_GROUP_ID), peers); + + RaftClient.Builder builder = RaftClient.newBuilder() + .setProperties(raftProperties) + .setRaftGroup(raftGroup) + .setClientRpc(new GrpcFactory(new Parameters()).newRaftClientRpc(ClientId.randomId(), raftProperties)); + return builder.build(); + } +} diff --git a/src/main/java/com/yuandian/dataflow/statemachine/MasterFactory.java b/src/main/java/com/yuandian/dataflow/statemachine_old/MasterFactory.java similarity index 77% rename from src/main/java/com/yuandian/dataflow/statemachine/MasterFactory.java rename to src/main/java/com/yuandian/dataflow/statemachine_old/MasterFactory.java index 9fde206..ea36356 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/MasterFactory.java +++ b/src/main/java/com/yuandian/dataflow/statemachine_old/MasterFactory.java @@ -4,7 +4,7 @@ * @author eson *2022年7月20日-10:00:05 */ -package com.yuandian.dataflow.statemachine; +package com.yuandian.dataflow.statemachine_old; import java.time.Duration; import java.time.Instant; @@ -18,13 +18,13 @@ import com.alipay.sofa.jraft.rpc.InvokeCallback; import com.google.protobuf.Any; import com.yuandian.dataflow.controller.PacketsProcessor.PacketsRequest; import com.yuandian.dataflow.proto.msgtype.BacktrackingFlowOuterClass; -import com.yuandian.dataflow.statemachine.closure.GenericClosure; -import com.yuandian.dataflow.statemachine.master.MasterContext; -import com.yuandian.dataflow.statemachine.master.MasterExecute; -import com.yuandian.dataflow.statemachine.operate.Operate; -import com.yuandian.dataflow.statemachine.operate.Operate.OperateType; -import com.yuandian.dataflow.statemachine.state.State; -import com.yuandian.dataflow.statemachine.state.WorkerState; +import com.yuandian.dataflow.statemachine_old.closure.GenericClosure; +import com.yuandian.dataflow.statemachine_old.master.MasterContext; +import com.yuandian.dataflow.statemachine_old.master.MasterExecute; +import com.yuandian.dataflow.statemachine_old.operate.Operate; +import com.yuandian.dataflow.statemachine_old.operate.Operate.OperateType; +import com.yuandian.dataflow.statemachine_old.state.State; +import com.yuandian.dataflow.statemachine_old.state.WorkerState; import lombok.Getter; import lombok.Setter; diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateFactory.java b/src/main/java/com/yuandian/dataflow/statemachine_old/StateFactory.java similarity index 94% rename from src/main/java/com/yuandian/dataflow/statemachine/StateFactory.java rename to src/main/java/com/yuandian/dataflow/statemachine_old/StateFactory.java index 1c2a8e6..fdc26b0 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateFactory.java +++ b/src/main/java/com/yuandian/dataflow/statemachine_old/StateFactory.java @@ -4,7 +4,7 @@ * @author eson *2022年7月12日-13:36:24 */ -package com.yuandian.dataflow.statemachine; +package com.yuandian.dataflow.statemachine_old; import java.io.File; import java.lang.reflect.InvocationTargetException; @@ -49,15 +49,15 @@ import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory; import com.alipay.sofa.jraft.util.BytesUtil; import com.alipay.sofa.jraft.util.Endpoint; import com.alipay.sofa.jraft.util.ThreadPoolUtil; -import com.yuandian.dataflow.statemachine.annotations.MasterRegister; -import com.yuandian.dataflow.statemachine.annotations.WorkerRegister; -import com.yuandian.dataflow.statemachine.closure.GenericClosure; -import com.yuandian.dataflow.statemachine.master.MasterExecute; -import com.yuandian.dataflow.statemachine.operate.Operate; -import com.yuandian.dataflow.statemachine.operate.Operate.OperateType; -import com.yuandian.dataflow.statemachine.rpc.OperateProcessor.OperateRequest; -import com.yuandian.dataflow.statemachine.rpc.RaftResponse; -import com.yuandian.dataflow.statemachine.state.State; +import com.yuandian.dataflow.statemachine_old.annotations.MasterRegister; +import com.yuandian.dataflow.statemachine_old.annotations.WorkerRegister; +import com.yuandian.dataflow.statemachine_old.closure.GenericClosure; +import com.yuandian.dataflow.statemachine_old.master.MasterExecute; +import com.yuandian.dataflow.statemachine_old.operate.Operate; +import com.yuandian.dataflow.statemachine_old.operate.Operate.OperateType; +import com.yuandian.dataflow.statemachine_old.rpc.RaftResponse; +import com.yuandian.dataflow.statemachine_old.rpc.OperateProcessor.OperateRequest; +import com.yuandian.dataflow.statemachine_old.state.State; import com.yuandian.dataflow.utils.Utils; import lombok.Getter; diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java b/src/main/java/com/yuandian/dataflow/statemachine_old/StateMachine.java similarity index 96% rename from src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java rename to src/main/java/com/yuandian/dataflow/statemachine_old/StateMachine.java index e251d06..d0cf88e 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java +++ b/src/main/java/com/yuandian/dataflow/statemachine_old/StateMachine.java @@ -1,4 +1,4 @@ -package com.yuandian.dataflow.statemachine; +package com.yuandian.dataflow.statemachine_old; import java.nio.ByteBuffer; import java.time.Instant; @@ -18,11 +18,11 @@ import com.alipay.sofa.jraft.error.RaftException; import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader; import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter; import com.yuandian.dataflow.controller.PacketsProcessor.PacketsRequest; -import com.yuandian.dataflow.statemachine.closure.GenericClosure; -import com.yuandian.dataflow.statemachine.operate.Operate; -import com.yuandian.dataflow.statemachine.operate.Operate.OperateType; -import com.yuandian.dataflow.statemachine.state.State; -import com.yuandian.dataflow.statemachine.state.WorkerState; +import com.yuandian.dataflow.statemachine_old.closure.GenericClosure; +import com.yuandian.dataflow.statemachine_old.operate.Operate; +import com.yuandian.dataflow.statemachine_old.operate.Operate.OperateType; +import com.yuandian.dataflow.statemachine_old.state.State; +import com.yuandian.dataflow.statemachine_old.state.WorkerState; import com.yuandian.dataflow.utils.Utils; import lombok.extern.slf4j.Slf4j; diff --git a/src/main/java/com/yuandian/dataflow/statemachine/annotations/MasterRegister.java b/src/main/java/com/yuandian/dataflow/statemachine_old/annotations/MasterRegister.java similarity index 88% rename from src/main/java/com/yuandian/dataflow/statemachine/annotations/MasterRegister.java rename to src/main/java/com/yuandian/dataflow/statemachine_old/annotations/MasterRegister.java index 968b73c..497114c 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/annotations/MasterRegister.java +++ b/src/main/java/com/yuandian/dataflow/statemachine_old/annotations/MasterRegister.java @@ -4,7 +4,7 @@ * @author eson *2022年7月21日-14:27:49 */ -package com.yuandian.dataflow.statemachine.annotations; +package com.yuandian.dataflow.statemachine_old.annotations; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; diff --git a/src/main/java/com/yuandian/dataflow/statemachine/annotations/WorkerRegister.java b/src/main/java/com/yuandian/dataflow/statemachine_old/annotations/WorkerRegister.java similarity index 87% rename from src/main/java/com/yuandian/dataflow/statemachine/annotations/WorkerRegister.java rename to src/main/java/com/yuandian/dataflow/statemachine_old/annotations/WorkerRegister.java index e579111..d7eda8a 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/annotations/WorkerRegister.java +++ b/src/main/java/com/yuandian/dataflow/statemachine_old/annotations/WorkerRegister.java @@ -4,7 +4,7 @@ * @author eson *2022年7月21日-14:27:49 */ -package com.yuandian.dataflow.statemachine.annotations; +package com.yuandian.dataflow.statemachine_old.annotations; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; diff --git a/src/main/java/com/yuandian/dataflow/statemachine/closure/GenericClosure.java b/src/main/java/com/yuandian/dataflow/statemachine_old/closure/GenericClosure.java similarity index 91% rename from src/main/java/com/yuandian/dataflow/statemachine/closure/GenericClosure.java rename to src/main/java/com/yuandian/dataflow/statemachine_old/closure/GenericClosure.java index f68b264..c828f97 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/closure/GenericClosure.java +++ b/src/main/java/com/yuandian/dataflow/statemachine_old/closure/GenericClosure.java @@ -1,8 +1,8 @@ -package com.yuandian.dataflow.statemachine.closure; +package com.yuandian.dataflow.statemachine_old.closure; import com.alipay.sofa.jraft.Closure; import com.alipay.sofa.jraft.entity.PeerId; -import com.yuandian.dataflow.statemachine.rpc.RaftResponse; +import com.yuandian.dataflow.statemachine_old.rpc.RaftResponse; import lombok.Getter; import lombok.Setter; diff --git a/src/main/java/com/yuandian/dataflow/statemachine/master/MasterContext.java b/src/main/java/com/yuandian/dataflow/statemachine_old/master/MasterContext.java similarity index 95% rename from src/main/java/com/yuandian/dataflow/statemachine/master/MasterContext.java rename to src/main/java/com/yuandian/dataflow/statemachine_old/master/MasterContext.java index d9cd8ba..9db19bd 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/master/MasterContext.java +++ b/src/main/java/com/yuandian/dataflow/statemachine_old/master/MasterContext.java @@ -1,4 +1,4 @@ -package com.yuandian.dataflow.statemachine.master; +package com.yuandian.dataflow.statemachine_old.master; import java.time.Duration; import java.util.concurrent.atomic.AtomicBoolean; diff --git a/src/main/java/com/yuandian/dataflow/statemachine/master/MasterExecute.java b/src/main/java/com/yuandian/dataflow/statemachine_old/master/MasterExecute.java similarity index 66% rename from src/main/java/com/yuandian/dataflow/statemachine/master/MasterExecute.java rename to src/main/java/com/yuandian/dataflow/statemachine_old/master/MasterExecute.java index d1542eb..d34411b 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/master/MasterExecute.java +++ b/src/main/java/com/yuandian/dataflow/statemachine_old/master/MasterExecute.java @@ -1,4 +1,4 @@ -package com.yuandian.dataflow.statemachine.master; +package com.yuandian.dataflow.statemachine_old.master; /** * Master的主线程循环 diff --git a/src/main/java/com/yuandian/dataflow/statemachine/operate/Operate.java b/src/main/java/com/yuandian/dataflow/statemachine_old/operate/Operate.java similarity index 87% rename from src/main/java/com/yuandian/dataflow/statemachine/operate/Operate.java rename to src/main/java/com/yuandian/dataflow/statemachine_old/operate/Operate.java index 64e18eb..0a269e7 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/operate/Operate.java +++ b/src/main/java/com/yuandian/dataflow/statemachine_old/operate/Operate.java @@ -1,15 +1,15 @@ -package com.yuandian.dataflow.statemachine.operate; +package com.yuandian.dataflow.statemachine_old.operate; import java.io.Serializable; import com.alipay.sofa.jraft.Status; import com.alipay.sofa.jraft.error.RemotingException; import com.alipay.sofa.jraft.rpc.InvokeCallback; -import com.yuandian.dataflow.statemachine.StateFactory; -import com.yuandian.dataflow.statemachine.closure.GenericClosure; -import com.yuandian.dataflow.statemachine.rpc.OperateProcessor; -import com.yuandian.dataflow.statemachine.rpc.RaftResponse; -import com.yuandian.dataflow.statemachine.state.WorkerState; +import com.yuandian.dataflow.statemachine_old.StateFactory; +import com.yuandian.dataflow.statemachine_old.closure.GenericClosure; +import com.yuandian.dataflow.statemachine_old.rpc.OperateProcessor; +import com.yuandian.dataflow.statemachine_old.rpc.RaftResponse; +import com.yuandian.dataflow.statemachine_old.state.WorkerState; import com.yuandian.dataflow.utils.PacketsManager; import lombok.Data; diff --git a/src/main/java/com/yuandian/dataflow/statemachine/rpc/OperateProcessor.java b/src/main/java/com/yuandian/dataflow/statemachine_old/rpc/OperateProcessor.java similarity index 85% rename from src/main/java/com/yuandian/dataflow/statemachine/rpc/OperateProcessor.java rename to src/main/java/com/yuandian/dataflow/statemachine_old/rpc/OperateProcessor.java index b75bdc5..32c8ffc 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/rpc/OperateProcessor.java +++ b/src/main/java/com/yuandian/dataflow/statemachine_old/rpc/OperateProcessor.java @@ -4,7 +4,7 @@ * @author eson *2022年7月12日-11:10:54 */ -package com.yuandian.dataflow.statemachine.rpc; +package com.yuandian.dataflow.statemachine_old.rpc; import java.io.Serializable; @@ -12,10 +12,10 @@ import com.alipay.sofa.jraft.Status; import com.alipay.sofa.jraft.error.RaftError; import com.alipay.sofa.jraft.rpc.RpcContext; import com.alipay.sofa.jraft.rpc.RpcProcessor; -import com.yuandian.dataflow.statemachine.StateFactory; -import com.yuandian.dataflow.statemachine.annotations.WorkerRegister; -import com.yuandian.dataflow.statemachine.closure.GenericClosure; -import com.yuandian.dataflow.statemachine.operate.Operate; +import com.yuandian.dataflow.statemachine_old.StateFactory; +import com.yuandian.dataflow.statemachine_old.annotations.WorkerRegister; +import com.yuandian.dataflow.statemachine_old.closure.GenericClosure; +import com.yuandian.dataflow.statemachine_old.operate.Operate; import javassist.ClassPath; diff --git a/src/main/java/com/yuandian/dataflow/statemachine/rpc/RaftResponse.java b/src/main/java/com/yuandian/dataflow/statemachine_old/rpc/RaftResponse.java similarity index 92% rename from src/main/java/com/yuandian/dataflow/statemachine/rpc/RaftResponse.java rename to src/main/java/com/yuandian/dataflow/statemachine_old/rpc/RaftResponse.java index f931b7f..ce3b494 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/rpc/RaftResponse.java +++ b/src/main/java/com/yuandian/dataflow/statemachine_old/rpc/RaftResponse.java @@ -4,7 +4,7 @@ * @author eson *2022年7月13日-09:07:22 */ -package com.yuandian.dataflow.statemachine.rpc; +package com.yuandian.dataflow.statemachine_old.rpc; import java.io.Serializable; diff --git a/src/main/java/com/yuandian/dataflow/statemachine/state/State.java b/src/main/java/com/yuandian/dataflow/statemachine_old/state/State.java similarity index 93% rename from src/main/java/com/yuandian/dataflow/statemachine/state/State.java rename to src/main/java/com/yuandian/dataflow/statemachine_old/state/State.java index c48a18f..eb3b361 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/state/State.java +++ b/src/main/java/com/yuandian/dataflow/statemachine_old/state/State.java @@ -4,7 +4,7 @@ * @author eson *2022年7月13日-09:11:26 */ -package com.yuandian.dataflow.statemachine.state; +package com.yuandian.dataflow.statemachine_old.state; import java.io.Serializable; diff --git a/src/main/java/com/yuandian/dataflow/statemachine/state/WorkerState.java b/src/main/java/com/yuandian/dataflow/statemachine_old/state/WorkerState.java similarity index 94% rename from src/main/java/com/yuandian/dataflow/statemachine/state/WorkerState.java rename to src/main/java/com/yuandian/dataflow/statemachine_old/state/WorkerState.java index d837c49..ec2f5f2 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/state/WorkerState.java +++ b/src/main/java/com/yuandian/dataflow/statemachine_old/state/WorkerState.java @@ -4,7 +4,7 @@ * @author eson *2022年7月15日-10:04:00 */ -package com.yuandian.dataflow.statemachine.state; +package com.yuandian.dataflow.statemachine_old.state; import java.io.Serializable; import java.time.Instant; diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml index 4adb380..84633c8 100644 --- a/src/main/resources/logback.xml +++ b/src/main/resources/logback.xml @@ -24,7 +24,7 @@ --> - + \ No newline at end of file