diff --git a/.vscode/launch.json b/.vscode/launch.json
index 80b7356..45b0b95 100644
--- a/.vscode/launch.json
+++ b/.vscode/launch.json
@@ -4,6 +4,13 @@
     // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
     "version": "0.2.0",
     "configurations": [
+        {
+            "type": "java",
+            "name": "Launch CounterClient",
+            "request": "launch",
+            "mainClass": "com.yuandian.dataflow.statemachine.client.CounterClient",
+            "projectName": "dataflow"
+        },
         {
             "type": "java",
             "name": "Launch Utils",
@@ -15,7 +22,7 @@
             "type": "java",
             "name": "Raft-0",
             "request": "launch",
-            "mainClass": "com.yuandian.dataflow.Server",
+            "mainClass": "com.yuandian.dataflow.statemachine.CounterServer",
             "projectName": "dataflow",
             "console": "integratedTerminal",
             "args": [
@@ -33,7 +40,7 @@
             "type": "java",
             "name": "Raft-1",
             "request": "launch",
-            "mainClass": "com.yuandian.dataflow.Server",
+            "mainClass": "com.yuandian.dataflow.statemachine.CounterServer",
             "projectName": "dataflow",
             "console": "integratedTerminal",
             "args": [
@@ -51,7 +58,7 @@
             "type": "java",
             "name": "Raft-2",
             "request": "launch",
-            "mainClass": "com.yuandian.dataflow.Server",
+            "mainClass": "com.yuandian.dataflow.statemachine.CounterServer",
             "projectName": "dataflow",
             "console": "integratedTerminal",
             "args": [
diff --git a/pom.xml b/pom.xml
index 95a1bb4..e27c845 100644
--- a/pom.xml
+++ b/pom.xml
@@ -75,6 +75,45 @@
             <version>${snakeyaml.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.ratis</groupId>
+            <artifactId>ratis-common</artifactId>
+            <version>2.3.0</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.ratis</groupId>
+            <artifactId>ratis-grpc</artifactId>
+            <version>2.3.0</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.ratis</groupId>
+            <artifactId>ratis-server</artifactId>
+            <version>2.3.0</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.ratis</groupId>
+            <artifactId>ratis-thirdparty-misc</artifactId>
+            <version>1.0.1</version>
+          </dependency>
+
+          <dependency>
+            <groupId>org.apache.ratis</groupId>
+            <artifactId>ratis-proto</artifactId>
+            <version>2.3.0</version>
+          </dependency>
+
+          <dependency>
+            <groupId>org.apache.ratis</groupId>
+            <artifactId>ratis-tools</artifactId>
+            <version>2.3.0</version>
+          </dependency>
+        
+   
+        
+
 
         <dependency>
             <groupId>com.alipay.sofa</groupId>
@@ -106,6 +145,16 @@
             <version>${spring.boot.version}</version>
         </dependency>
 
+     
+        <!-- https://mvnrepository.com/artifact/io.dropwizard.metrics/metrics-core -->
+        <dependency>
+            <groupId>io.dropwizard.metrics</groupId>
+            <artifactId>metrics-core</artifactId>
+            <version>3.2.6</version>
+        </dependency>
+
+        
+
 
         <!-- protobuf 依赖 -->
 
@@ -251,7 +300,7 @@
                         <manifest>
                             <addClasspath>true</addClasspath>
                             <classpathPrefix>lib/</classpathPrefix>
-                             <mainClass>com.yuandian.dataflow.Server</mainClass>
+                             <mainClass>com.yuandian.dataflow.statemachine.CounterServer</mainClass>
                         </manifest>
                     </archive>
         
@@ -295,7 +344,7 @@
                     <appendAssemblyId>false</appendAssemblyId>
                     <archive>
                         <manifest>
-                            <mainClass>com.yuandian.dataflow.Server</mainClass>
+                            <mainClass>com.yuandian.dataflow.statemachine.CounterServer</mainClass>
                         </manifest>
                     </archive>
                     <descriptors>
diff --git a/src/main/java/com/yuandian/dataflow/Server.java b/src/main/java/com/yuandian/dataflow/Server.java
index 7f34e85..28c322f 100644
--- a/src/main/java/com/yuandian/dataflow/Server.java
+++ b/src/main/java/com/yuandian/dataflow/Server.java
@@ -9,7 +9,7 @@ import org.slf4j.MarkerFactory;
 
 import com.alipay.sofa.jraft.JRaftUtils;
 import com.alipay.sofa.jraft.conf.Configuration;
-import com.yuandian.dataflow.statemachine.StateFactory;
+import com.yuandian.dataflow.statemachine_old.StateFactory;
 import com.yuandian.dataflow.utils.Utils;
 
 import io.netty.util.internal.StringUtil;
diff --git a/src/main/java/com/yuandian/dataflow/controller/MasterProcessor.java b/src/main/java/com/yuandian/dataflow/controller/MasterProcessor.java
index 2479d83..ccdbaf9 100644
--- a/src/main/java/com/yuandian/dataflow/controller/MasterProcessor.java
+++ b/src/main/java/com/yuandian/dataflow/controller/MasterProcessor.java
@@ -16,15 +16,15 @@ import com.alipay.sofa.jraft.rpc.InvokeCallback;
 import com.google.protobuf.Any;
 import com.yuandian.dataflow.controller.PacketsProcessor.PacketsRequest;
 import com.yuandian.dataflow.proto.msgtype.BacktrackingFlowOuterClass;
-import com.yuandian.dataflow.statemachine.StateFactory;
-import com.yuandian.dataflow.statemachine.annotations.MasterRegister;
-import com.yuandian.dataflow.statemachine.closure.GenericClosure;
-import com.yuandian.dataflow.statemachine.master.MasterContext;
-import com.yuandian.dataflow.statemachine.master.MasterExecute;
-import com.yuandian.dataflow.statemachine.operate.Operate;
-import com.yuandian.dataflow.statemachine.operate.Operate.OperateType;
-import com.yuandian.dataflow.statemachine.state.State;
-import com.yuandian.dataflow.statemachine.state.WorkerState;
+import com.yuandian.dataflow.statemachine_old.StateFactory;
+import com.yuandian.dataflow.statemachine_old.annotations.MasterRegister;
+import com.yuandian.dataflow.statemachine_old.closure.GenericClosure;
+import com.yuandian.dataflow.statemachine_old.master.MasterContext;
+import com.yuandian.dataflow.statemachine_old.master.MasterExecute;
+import com.yuandian.dataflow.statemachine_old.operate.Operate;
+import com.yuandian.dataflow.statemachine_old.operate.Operate.OperateType;
+import com.yuandian.dataflow.statemachine_old.state.State;
+import com.yuandian.dataflow.statemachine_old.state.WorkerState;
 import com.yuandian.dataflow.utils.PacketsManager;
 import com.yuandian.dataflow.utils.Utils;
 
diff --git a/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java b/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java
index 24d9b9c..638101b 100644
--- a/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java
+++ b/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java
@@ -17,13 +17,13 @@ import com.alipay.sofa.jraft.Status;
 import com.alipay.sofa.jraft.rpc.RpcContext;
 import com.alipay.sofa.jraft.rpc.RpcProcessor;
 import com.google.protobuf.Any;
-import com.yuandian.dataflow.statemachine.StateFactory;
-import com.yuandian.dataflow.statemachine.annotations.WorkerRegister;
-import com.yuandian.dataflow.statemachine.closure.GenericClosure;
-import com.yuandian.dataflow.statemachine.operate.Operate;
-import com.yuandian.dataflow.statemachine.operate.Operate.OperateType;
-import com.yuandian.dataflow.statemachine.rpc.RaftResponse;
-import com.yuandian.dataflow.statemachine.state.State;
+import com.yuandian.dataflow.statemachine_old.StateFactory;
+import com.yuandian.dataflow.statemachine_old.annotations.WorkerRegister;
+import com.yuandian.dataflow.statemachine_old.closure.GenericClosure;
+import com.yuandian.dataflow.statemachine_old.operate.Operate;
+import com.yuandian.dataflow.statemachine_old.operate.Operate.OperateType;
+import com.yuandian.dataflow.statemachine_old.rpc.RaftResponse;
+import com.yuandian.dataflow.statemachine_old.state.State;
 
 import lombok.Getter;
 import lombok.Setter;
diff --git a/src/main/java/com/yuandian/dataflow/controller/TransferLeaderProcessor.java b/src/main/java/com/yuandian/dataflow/controller/TransferLeaderProcessor.java
index 7f40148..4a9a0ab 100644
--- a/src/main/java/com/yuandian/dataflow/controller/TransferLeaderProcessor.java
+++ b/src/main/java/com/yuandian/dataflow/controller/TransferLeaderProcessor.java
@@ -6,8 +6,8 @@ import com.alipay.sofa.jraft.Status;
 import com.alipay.sofa.jraft.entity.PeerId;
 import com.alipay.sofa.jraft.rpc.RpcContext;
 import com.alipay.sofa.jraft.rpc.RpcProcessor;
-import com.yuandian.dataflow.statemachine.StateFactory;
-import com.yuandian.dataflow.statemachine.annotations.WorkerRegister;
+import com.yuandian.dataflow.statemachine_old.StateFactory;
+import com.yuandian.dataflow.statemachine_old.annotations.WorkerRegister;
 
 import lombok.Getter;
 import lombok.Setter;
diff --git a/src/main/java/com/yuandian/dataflow/statemachine/CounterServer.java b/src/main/java/com/yuandian/dataflow/statemachine/CounterServer.java
new file mode 100644
index 0000000..392ebca
--- /dev/null
+++ b/src/main/java/com/yuandian/dataflow/statemachine/CounterServer.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.yuandian.dataflow.statemachine;
+
+import org.apache.ratis.conf.RaftProperties;
+// import org.apache.ratis.examples.common.Constants; 
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.util.NetUtils;
+
+import io.netty.util.internal.StringUtil;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Scanner;
+import java.util.UUID;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+/**
+ * Simplest Ratis server, use a simple state machine {@link CounterStateMachine}
+ * which maintain a counter across multi server.
+ * This server application designed to run several times with different
+ * parameters (1,2 or 3). server addresses hard coded in {@link Constants}
+ * <p>
+ * Run this application three times with three different parameter set-up a
+ * ratis cluster which maintain a counter value replicated in each server memory
+ */
+public final class CounterServer implements Closeable {
+  private final RaftServer server;
+
+  public static final UUID CLUSTER_GROUP_ID = UUID.fromString("79642d72-6166-742d-6461-7461666c6f77");
+
+  public CounterServer(RaftPeer peer,  ArrayList<RaftPeer> peers, File storageDir) throws IOException {
+    //create a property object
+    RaftProperties properties = new RaftProperties();
+
+    //set the storage directory (different for each peer) in RaftProperty object
+    RaftServerConfigKeys.setStorageDir(properties, Collections.singletonList(storageDir));
+
+    //set the port which server listen to in RaftProperty object
+    final int port = NetUtils.createSocketAddr(peer.getAddress()).getPort();
+    GrpcConfigKeys.Server.setPort(properties, port);
+
+    //create the counter state machine which hold the counter value
+    CounterStateMachine counterStateMachine = new CounterStateMachine();
+
+    RaftGroup raftGroup = RaftGroup.valueOf(
+      RaftGroupId.valueOf(CLUSTER_GROUP_ID), peers);
+      
+    //create and start the Raft server
+    this.server = RaftServer.newBuilder()
+        .setGroup(raftGroup)
+        .setProperties(properties)
+        .setServerId(peer.getId())
+        .setStateMachine(counterStateMachine)
+        .build();
+  }
+
+  public void start() throws IOException {
+    server.start();
+  }
+
+  @Override
+  public void close() throws IOException {
+    server.close();
+  }
+
+  public static void main(String[] args) throws IOException {
+    if (args.length < 1) {
+      System.err.println("Usage: java -cp *.jar org.apache.ratis.examples.counter.server.CounterServer {serverIndex}");
+      System.err.println("{serverIndex} could be 1, 2 or 3");
+      System.exit(1);
+    }
+
+    var peers = new ArrayList<RaftPeer>();
+    String[] addresses = new String[]{"localhost:4440","localhost:4441","localhost:4442"};
+
+    for (int i = 0; i < addresses.length; i++) {
+      var port = addresses[i].split(":")[1];
+      peers.add(RaftPeer.newBuilder().setId("" + port).setAddress(addresses[i]).build());
+    }
+
+    //find current peer object based on application parameter
+    final RaftPeer currentPeer = peers.get(Integer.parseInt(args[0]));
+
+    //start a counter server
+    final File storageDir = new File("./raftdata/" + currentPeer.getId());
+    final CounterServer counterServer = new CounterServer(currentPeer, peers, storageDir);
+    
+    counterServer.start();
+ 
+    
+
+
+    //exit when any input entered
+    Scanner scanner = new Scanner(System.in, UTF_8.name());
+    scanner.nextLine();
+    counterServer.close();
+  }
+}
diff --git a/src/main/java/com/yuandian/dataflow/statemachine/CounterStateMachine.java b/src/main/java/com/yuandian/dataflow/statemachine/CounterStateMachine.java
new file mode 100644
index 0000000..73f1d98
--- /dev/null
+++ b/src/main/java/com/yuandian/dataflow/statemachine/CounterStateMachine.java
@@ -0,0 +1,288 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.yuandian.dataflow.statemachine;
+
+import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.proto.RaftProtos.RoleInfoProto;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftGroupMemberId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.server.storage.RaftStorage;
+import org.apache.ratis.statemachine.TransactionContext;
+import org.apache.ratis.statemachine.impl.BaseStateMachine;
+import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
+import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
+import org.apache.ratis.util.AutoCloseableLock;
+import org.apache.ratis.util.JavaUtils;
+
+import com.yuandian.dataflow.statemachine_old.operate.Operate;
+import com.yuandian.dataflow.statemachine_old.state.State;
+import com.yuandian.dataflow.statemachine_old.state.WorkerState;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.nio.charset.Charset;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * State machine implementation for Counter server application. This class
+ * maintain a {@link AtomicInteger} object as a state and accept two commands:
+ * GET and INCREMENT, GET is a ReadOnly command which will be handled by
+ * {@code query} method however INCREMENT is a transactional command which
+ * will be handled by {@code applyTransaction}.
+ */
+@Slf4j
+public class CounterStateMachine extends BaseStateMachine {
+  private final SimpleStateMachineStorage storage =
+      new SimpleStateMachineStorage();
+
+
+  private State state = new State();
+
+  private AtomicInteger counter = new AtomicInteger(0);
+
+  private AtomicBoolean leader = new AtomicBoolean(false);
+
+  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
+
+  private AutoCloseableLock readLock() {
+    return AutoCloseableLock.acquire(lock.readLock());
+  }
+
+  private AutoCloseableLock writeLock() {
+    return AutoCloseableLock.acquire(lock.writeLock());
+  }
+
+  public Boolean isLeader() {
+      return leader.get();
+  }
+
+  /**
+   * initialize the state machine by initilize the state machine storage and
+   * calling the load method which reads the last applied command and restore it
+   * in counter object)
+   *
+   * @param server      the current server information
+   * @param groupId     the cluster groupId
+   * @param raftStorage the raft storage which is used to keep raft related
+   *                    stuff
+   * @throws IOException if any error happens during load state
+   */
+  @Override
+  public void initialize(RaftServer server, RaftGroupId groupId,
+                         RaftStorage raftStorage) throws IOException {
+    super.initialize(server, groupId, raftStorage);
+    this.storage.init(raftStorage);
+    load(storage.getLatestSnapshot());
+  }
+
+  /**
+   * very similar to initialize method, but doesn't initialize the storage
+   * system because the state machine reinitialized from the PAUSE state and
+   * storage system initialized before.
+   *
+   * @throws IOException if any error happens during load state
+   */
+  @Override
+  public void reinitialize() throws IOException {
+    load(storage.getLatestSnapshot());
+  }
+
+  /**
+   * Store the current state as an snapshot file in the stateMachineStorage.
+   *
+   * @return the index of the snapshot
+   */
+  @Override
+  public long takeSnapshot() {
+    //get the last applied index
+    final TermIndex last = getLastAppliedTermIndex();
+
+    //create a file with a proper name to store the snapshot
+    final File snapshotFile =
+        storage.getSnapshotFile(last.getTerm(), last.getIndex());
+
+    //serialize the counter object and write it into the snapshot file
+    try (ObjectOutputStream out = new ObjectOutputStream(
+        new BufferedOutputStream(new FileOutputStream(snapshotFile)))) {
+      out.writeObject(counter);
+    } catch (IOException ioe) {
+      LOG.warn("Failed to write snapshot file \"" + snapshotFile
+          + "\", last applied index=" + last);
+    }
+
+    //return the index of the stored snapshot (which is the last applied one)
+    return last.getIndex();
+  }
+
+  /**
+   * Load the state of the state machine from the storage.
+   *
+   * @param snapshot to load
+   * @return the index of the snapshot or -1 if snapshot is invalid
+   * @throws IOException if any error happens during read from storage
+   */
+  private long load(SingleFileSnapshotInfo snapshot) throws IOException {
+    //check the snapshot nullity
+    if (snapshot == null) {
+      LOG.warn("The snapshot info is null.");
+      return RaftLog.INVALID_LOG_INDEX;
+    }
+
+    //check the existance of the snapshot file
+    final File snapshotFile = snapshot.getFile().getPath().toFile();
+    if (!snapshotFile.exists()) {
+      LOG.warn("The snapshot file {} does not exist for snapshot {}",
+          snapshotFile, snapshot);
+      return RaftLog.INVALID_LOG_INDEX;
+    }
+
+    //load the TermIndex object for the snapshot using the file name pattern of
+    // the snapshot
+    final TermIndex last =
+        SimpleStateMachineStorage.getTermIndexFromSnapshotFile(snapshotFile);
+
+    //read the file and cast it to the AtomicInteger and set the counter
+    try (ObjectInputStream in = new ObjectInputStream(
+        new BufferedInputStream(new FileInputStream(snapshotFile)))) {
+      //set the last applied termIndex to the termIndex of the snapshot
+      setLastAppliedTermIndex(last);
+
+      //read, cast and set the counter
+      counter = JavaUtils.cast(in.readObject());
+    } catch (ClassNotFoundException e) {
+      throw new IllegalStateException(e);
+    }
+
+    return last.getIndex();
+  }
+ 
+ 
+  @Override
+  public void notifyLeaderChanged(RaftGroupMemberId groupMemberId, RaftPeerId newLeaderId) {
+    log.info("newLeaderId: {} groupMemberId: {}", newLeaderId  , groupMemberId.getPeerId());
+    leader.set(newLeaderId == groupMemberId.getPeerId());
+    super.notifyLeaderChanged(groupMemberId, newLeaderId);
+  }
+
+
+  
+
+  /**
+   * Handle GET command, which used by clients to get the counter value.
+   *
+   * @param request the GET message
+   * @return the Message containing the current counter value
+   */
+  @Override
+  public CompletableFuture<Message> query(Message request) {
+    String msg = request.getContent().toString(Charset.defaultCharset());
+ 
+    if (!msg.equals("GET")) {
+      return CompletableFuture.completedFuture(
+          Message.valueOf("Invalid Command"));
+    }
+
+    return CompletableFuture.completedFuture(
+        Message.valueOf(counter.toString()));
+ 
+  }
+
+  /**
+   * Apply the INCREMENT command by incrementing the counter object.
+   *
+   * @param trx the transaction context
+   * @return the message containing the updated counter value
+   */
+  @Override
+  public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
+    final RaftProtos.LogEntryProto entry = trx.getLogEntry();
+  
+    //check if the command is valid
+    // String logData = entry.getStateMachineLogEntry().getLogData()
+    //     .toString(Charset.defaultCharset());
+
+    Operate op ;
+    try {
+      op = (Operate)new ObjectInputStream(entry.getStateMachineLogEntry().getLogData().newInput()).readObject();
+    } catch (IOException | ClassNotFoundException e) {
+      e.printStackTrace();
+      return CompletableFuture.completedFuture(Message.valueOf("错误op"));
+    }
+
+    // if (!logData.equals("INCREMENT")) {
+    //   return CompletableFuture.completedFuture(
+    //       Message.valueOf("Invalid Command"));
+    // }
+    //update the last applied term and index
+    final long index = entry.getIndex();
+    
+    try(var r = writeLock()) {
+      switch(op.getType()) {
+        case ALLOCATE_PACKETS:
+          break;
+        case GET_STATE:
+          break;
+        case PUT_WORKERSTATE:
+          var ws = op.<WorkerState>getValue();
+          state.getWorkers().put(ws.getPeerId() , ws);
+          break;
+        case REMOVE:
+          break;
+        default:
+          break;
+        
+      }
+      updateLastAppliedTermIndex(entry.getTerm(), entry.getIndex());
+    }
+    
+
+    //actual execution of the command: increment the counter
+    // counter.incrementAndGet();
+
+    //return the new value of the counter to the client
+    final CompletableFuture<Message> f =
+        CompletableFuture.completedFuture(Message.valueOf("put ok"));
+
+    //if leader, log the incremented value and it's log index
+    if (isLeader()) {
+      log.info("{}: Increment to {}", index, counter.toString());
+    }
+     
+
+    return f;
+  }
+}
diff --git a/src/main/java/com/yuandian/dataflow/statemachine/client/CounterClient.java b/src/main/java/com/yuandian/dataflow/statemachine/client/CounterClient.java
new file mode 100644
index 0000000..a639c27
--- /dev/null
+++ b/src/main/java/com/yuandian/dataflow/statemachine/client/CounterClient.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.yuandian.dataflow.statemachine.client;
+
+
+ 
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcFactory;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.NetUtils;
+import org.springframework.cglib.proxy.CallbackFilter;
+
+import com.yuandian.dataflow.statemachine.CounterServer;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Counter client application, this application sends specific number of
+ * INCREMENT command to the Counter cluster and at the end sends a GET command
+ * and print the result
+ * <p>
+ * Parameter to this application indicate the number of INCREMENT command, if no
+ * parameter found, application use default value which is 10
+ */
+@Slf4j
+public final class CounterClient {
+
+  private CounterClient(){
+  }
+
+ 
+  public static void main(String[] args)
+      throws IOException, InterruptedException {
+    //indicate the number of INCREMENT command, set 10 if no parameter passed
+    int increment = args.length > 0 ? Integer.parseInt(args[0]) : 10;
+
+    //build the counter cluster client
+    RaftClient raftClient = buildClient();
+
+    //use a executor service with 10 thread to send INCREMENT commands
+    // concurrently
+    ExecutorService executorService = Executors.newFixedThreadPool(10);
+
+    increment = 1000;
+    CountDownLatch latch = new CountDownLatch(increment);
+    //send INCREMENT commands concurrently
+    System.out.printf("Sending %d increment command...%n", increment);
+    Instant now = Instant.now();
+    for (int i = 0; i < increment; i++) {
+      executorService.submit(() ->
+          raftClient.io().send(Message.valueOf("INCREMENT")));
+          latch.countDown();
+    }
+    
+
+    //shutdown the executor service and wait until they finish their work
+    executorService.shutdown();
+    executorService.awaitTermination(increment * 500L, TimeUnit.MILLISECONDS);
+    latch.await();
+    log.info("{}", Duration.between(now, Instant.now()).toMillis());
+    //send GET command and print the response
+ 
+    
+    RaftClientReply count = raftClient.io().sendReadOnly(Message.valueOf("GET"));
+    String response = count.getMessage().getContent().toString(Charset.defaultCharset());
+    System.out.println(response);
+    
+  }
+
+  /**
+   * build the RaftClient instance which is used to communicate to
+   * Counter cluster
+   *
+   * @return the created client of Counter cluster
+   */
+  private static RaftClient buildClient() {
+    RaftProperties raftProperties = new RaftProperties();
+        //set the storage directory (different for each peer) in RaftProperty object
+    
+
+    //set the port which server listen to in RaftProperty object
+    // final int port = NetUtils.createSocketAddr("localhost:4440").getPort();
+    // GrpcConfigKeys.Server.setPort(raftProperties, port);
+
+    var peers = new ArrayList<RaftPeer>();
+    String[] addresses = new String[]{"localhost:4440","localhost:4441","localhost:4442"};
+
+    for (int i = 0; i < addresses.length; i++) {
+      var port = addresses[i].split(":")[1];
+      peers.add(RaftPeer.newBuilder().setId("" + port).setAddress(addresses[i]).build());
+    }
+    RaftGroup raftGroup = RaftGroup.valueOf(
+    RaftGroupId.valueOf(CounterServer.CLUSTER_GROUP_ID), peers);
+    
+    RaftClient.Builder builder = RaftClient.newBuilder()
+        .setProperties(raftProperties)
+        .setRaftGroup(raftGroup)
+        .setClientRpc(new GrpcFactory(new Parameters()).newRaftClientRpc(ClientId.randomId(), raftProperties));
+    return builder.build();
+  }
+}
diff --git a/src/main/java/com/yuandian/dataflow/statemachine/MasterFactory.java b/src/main/java/com/yuandian/dataflow/statemachine_old/MasterFactory.java
similarity index 77%
rename from src/main/java/com/yuandian/dataflow/statemachine/MasterFactory.java
rename to src/main/java/com/yuandian/dataflow/statemachine_old/MasterFactory.java
index 9fde206..ea36356 100644
--- a/src/main/java/com/yuandian/dataflow/statemachine/MasterFactory.java
+++ b/src/main/java/com/yuandian/dataflow/statemachine_old/MasterFactory.java
@@ -4,7 +4,7 @@
  * @author eson
  *2022年7月20日-10:00:05
  */
-package com.yuandian.dataflow.statemachine;
+package com.yuandian.dataflow.statemachine_old;
 
 import java.time.Duration;
 import java.time.Instant;
@@ -18,13 +18,13 @@ import com.alipay.sofa.jraft.rpc.InvokeCallback;
 import com.google.protobuf.Any;
 import com.yuandian.dataflow.controller.PacketsProcessor.PacketsRequest;
 import com.yuandian.dataflow.proto.msgtype.BacktrackingFlowOuterClass;
-import com.yuandian.dataflow.statemachine.closure.GenericClosure;
-import com.yuandian.dataflow.statemachine.master.MasterContext;
-import com.yuandian.dataflow.statemachine.master.MasterExecute;
-import com.yuandian.dataflow.statemachine.operate.Operate;
-import com.yuandian.dataflow.statemachine.operate.Operate.OperateType;
-import com.yuandian.dataflow.statemachine.state.State;
-import com.yuandian.dataflow.statemachine.state.WorkerState;
+import com.yuandian.dataflow.statemachine_old.closure.GenericClosure;
+import com.yuandian.dataflow.statemachine_old.master.MasterContext;
+import com.yuandian.dataflow.statemachine_old.master.MasterExecute;
+import com.yuandian.dataflow.statemachine_old.operate.Operate;
+import com.yuandian.dataflow.statemachine_old.operate.Operate.OperateType;
+import com.yuandian.dataflow.statemachine_old.state.State;
+import com.yuandian.dataflow.statemachine_old.state.WorkerState;
 
 import lombok.Getter;
 import lombok.Setter;
diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateFactory.java b/src/main/java/com/yuandian/dataflow/statemachine_old/StateFactory.java
similarity index 94%
rename from src/main/java/com/yuandian/dataflow/statemachine/StateFactory.java
rename to src/main/java/com/yuandian/dataflow/statemachine_old/StateFactory.java
index 1c2a8e6..fdc26b0 100644
--- a/src/main/java/com/yuandian/dataflow/statemachine/StateFactory.java
+++ b/src/main/java/com/yuandian/dataflow/statemachine_old/StateFactory.java
@@ -4,7 +4,7 @@
  * @author eson
  *2022年7月12日-13:36:24
  */
-package com.yuandian.dataflow.statemachine;
+package com.yuandian.dataflow.statemachine_old;
 
 import java.io.File;
 import java.lang.reflect.InvocationTargetException;
@@ -49,15 +49,15 @@ import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory;
 import com.alipay.sofa.jraft.util.BytesUtil;
 import com.alipay.sofa.jraft.util.Endpoint;
 import com.alipay.sofa.jraft.util.ThreadPoolUtil;
-import com.yuandian.dataflow.statemachine.annotations.MasterRegister;
-import com.yuandian.dataflow.statemachine.annotations.WorkerRegister;
-import com.yuandian.dataflow.statemachine.closure.GenericClosure;
-import com.yuandian.dataflow.statemachine.master.MasterExecute;
-import com.yuandian.dataflow.statemachine.operate.Operate;
-import com.yuandian.dataflow.statemachine.operate.Operate.OperateType;
-import com.yuandian.dataflow.statemachine.rpc.OperateProcessor.OperateRequest;
-import com.yuandian.dataflow.statemachine.rpc.RaftResponse;
-import com.yuandian.dataflow.statemachine.state.State;
+import com.yuandian.dataflow.statemachine_old.annotations.MasterRegister;
+import com.yuandian.dataflow.statemachine_old.annotations.WorkerRegister;
+import com.yuandian.dataflow.statemachine_old.closure.GenericClosure;
+import com.yuandian.dataflow.statemachine_old.master.MasterExecute;
+import com.yuandian.dataflow.statemachine_old.operate.Operate;
+import com.yuandian.dataflow.statemachine_old.operate.Operate.OperateType;
+import com.yuandian.dataflow.statemachine_old.rpc.RaftResponse;
+import com.yuandian.dataflow.statemachine_old.rpc.OperateProcessor.OperateRequest;
+import com.yuandian.dataflow.statemachine_old.state.State;
 import com.yuandian.dataflow.utils.Utils;
 
 import lombok.Getter;
diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java b/src/main/java/com/yuandian/dataflow/statemachine_old/StateMachine.java
similarity index 96%
rename from src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java
rename to src/main/java/com/yuandian/dataflow/statemachine_old/StateMachine.java
index e251d06..d0cf88e 100644
--- a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java
+++ b/src/main/java/com/yuandian/dataflow/statemachine_old/StateMachine.java
@@ -1,4 +1,4 @@
-package com.yuandian.dataflow.statemachine;
+package com.yuandian.dataflow.statemachine_old;
 
 import java.nio.ByteBuffer;
 import java.time.Instant;
@@ -18,11 +18,11 @@ import com.alipay.sofa.jraft.error.RaftException;
 import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader;
 import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter;
 import com.yuandian.dataflow.controller.PacketsProcessor.PacketsRequest;
-import com.yuandian.dataflow.statemachine.closure.GenericClosure;
-import com.yuandian.dataflow.statemachine.operate.Operate;
-import com.yuandian.dataflow.statemachine.operate.Operate.OperateType;
-import com.yuandian.dataflow.statemachine.state.State;
-import com.yuandian.dataflow.statemachine.state.WorkerState;
+import com.yuandian.dataflow.statemachine_old.closure.GenericClosure;
+import com.yuandian.dataflow.statemachine_old.operate.Operate;
+import com.yuandian.dataflow.statemachine_old.operate.Operate.OperateType;
+import com.yuandian.dataflow.statemachine_old.state.State;
+import com.yuandian.dataflow.statemachine_old.state.WorkerState;
 import com.yuandian.dataflow.utils.Utils;
 
 import lombok.extern.slf4j.Slf4j;
diff --git a/src/main/java/com/yuandian/dataflow/statemachine/annotations/MasterRegister.java b/src/main/java/com/yuandian/dataflow/statemachine_old/annotations/MasterRegister.java
similarity index 88%
rename from src/main/java/com/yuandian/dataflow/statemachine/annotations/MasterRegister.java
rename to src/main/java/com/yuandian/dataflow/statemachine_old/annotations/MasterRegister.java
index 968b73c..497114c 100644
--- a/src/main/java/com/yuandian/dataflow/statemachine/annotations/MasterRegister.java
+++ b/src/main/java/com/yuandian/dataflow/statemachine_old/annotations/MasterRegister.java
@@ -4,7 +4,7 @@
  * @author eson
  *2022年7月21日-14:27:49
  */
-package com.yuandian.dataflow.statemachine.annotations;
+package com.yuandian.dataflow.statemachine_old.annotations;
 
 import java.lang.annotation.ElementType;
 import java.lang.annotation.Retention;
diff --git a/src/main/java/com/yuandian/dataflow/statemachine/annotations/WorkerRegister.java b/src/main/java/com/yuandian/dataflow/statemachine_old/annotations/WorkerRegister.java
similarity index 87%
rename from src/main/java/com/yuandian/dataflow/statemachine/annotations/WorkerRegister.java
rename to src/main/java/com/yuandian/dataflow/statemachine_old/annotations/WorkerRegister.java
index e579111..d7eda8a 100644
--- a/src/main/java/com/yuandian/dataflow/statemachine/annotations/WorkerRegister.java
+++ b/src/main/java/com/yuandian/dataflow/statemachine_old/annotations/WorkerRegister.java
@@ -4,7 +4,7 @@
  * @author eson
  *2022年7月21日-14:27:49
  */
-package com.yuandian.dataflow.statemachine.annotations;
+package com.yuandian.dataflow.statemachine_old.annotations;
 
 import java.lang.annotation.ElementType;
 import java.lang.annotation.Retention;
diff --git a/src/main/java/com/yuandian/dataflow/statemachine/closure/GenericClosure.java b/src/main/java/com/yuandian/dataflow/statemachine_old/closure/GenericClosure.java
similarity index 91%
rename from src/main/java/com/yuandian/dataflow/statemachine/closure/GenericClosure.java
rename to src/main/java/com/yuandian/dataflow/statemachine_old/closure/GenericClosure.java
index f68b264..c828f97 100644
--- a/src/main/java/com/yuandian/dataflow/statemachine/closure/GenericClosure.java
+++ b/src/main/java/com/yuandian/dataflow/statemachine_old/closure/GenericClosure.java
@@ -1,8 +1,8 @@
-package com.yuandian.dataflow.statemachine.closure;
+package com.yuandian.dataflow.statemachine_old.closure;
 
 import com.alipay.sofa.jraft.Closure;
 import com.alipay.sofa.jraft.entity.PeerId;
-import com.yuandian.dataflow.statemachine.rpc.RaftResponse;
+import com.yuandian.dataflow.statemachine_old.rpc.RaftResponse;
 
 import lombok.Getter;
 import lombok.Setter;
diff --git a/src/main/java/com/yuandian/dataflow/statemachine/master/MasterContext.java b/src/main/java/com/yuandian/dataflow/statemachine_old/master/MasterContext.java
similarity index 95%
rename from src/main/java/com/yuandian/dataflow/statemachine/master/MasterContext.java
rename to src/main/java/com/yuandian/dataflow/statemachine_old/master/MasterContext.java
index d9cd8ba..9db19bd 100644
--- a/src/main/java/com/yuandian/dataflow/statemachine/master/MasterContext.java
+++ b/src/main/java/com/yuandian/dataflow/statemachine_old/master/MasterContext.java
@@ -1,4 +1,4 @@
-package com.yuandian.dataflow.statemachine.master;
+package com.yuandian.dataflow.statemachine_old.master;
 
 import java.time.Duration;
 import java.util.concurrent.atomic.AtomicBoolean;
diff --git a/src/main/java/com/yuandian/dataflow/statemachine/master/MasterExecute.java b/src/main/java/com/yuandian/dataflow/statemachine_old/master/MasterExecute.java
similarity index 66%
rename from src/main/java/com/yuandian/dataflow/statemachine/master/MasterExecute.java
rename to src/main/java/com/yuandian/dataflow/statemachine_old/master/MasterExecute.java
index d1542eb..d34411b 100644
--- a/src/main/java/com/yuandian/dataflow/statemachine/master/MasterExecute.java
+++ b/src/main/java/com/yuandian/dataflow/statemachine_old/master/MasterExecute.java
@@ -1,4 +1,4 @@
-package com.yuandian.dataflow.statemachine.master;
+package com.yuandian.dataflow.statemachine_old.master;
 
 /**
  *  Master的主线程循环
diff --git a/src/main/java/com/yuandian/dataflow/statemachine/operate/Operate.java b/src/main/java/com/yuandian/dataflow/statemachine_old/operate/Operate.java
similarity index 87%
rename from src/main/java/com/yuandian/dataflow/statemachine/operate/Operate.java
rename to src/main/java/com/yuandian/dataflow/statemachine_old/operate/Operate.java
index 64e18eb..0a269e7 100644
--- a/src/main/java/com/yuandian/dataflow/statemachine/operate/Operate.java
+++ b/src/main/java/com/yuandian/dataflow/statemachine_old/operate/Operate.java
@@ -1,15 +1,15 @@
-package com.yuandian.dataflow.statemachine.operate;
+package com.yuandian.dataflow.statemachine_old.operate;
 
 import java.io.Serializable;
 
 import com.alipay.sofa.jraft.Status;
 import com.alipay.sofa.jraft.error.RemotingException;
 import com.alipay.sofa.jraft.rpc.InvokeCallback;
-import com.yuandian.dataflow.statemachine.StateFactory;
-import com.yuandian.dataflow.statemachine.closure.GenericClosure;
-import com.yuandian.dataflow.statemachine.rpc.OperateProcessor;
-import com.yuandian.dataflow.statemachine.rpc.RaftResponse;
-import com.yuandian.dataflow.statemachine.state.WorkerState;
+import com.yuandian.dataflow.statemachine_old.StateFactory;
+import com.yuandian.dataflow.statemachine_old.closure.GenericClosure;
+import com.yuandian.dataflow.statemachine_old.rpc.OperateProcessor;
+import com.yuandian.dataflow.statemachine_old.rpc.RaftResponse;
+import com.yuandian.dataflow.statemachine_old.state.WorkerState;
 import com.yuandian.dataflow.utils.PacketsManager;
 
 import lombok.Data;
diff --git a/src/main/java/com/yuandian/dataflow/statemachine/rpc/OperateProcessor.java b/src/main/java/com/yuandian/dataflow/statemachine_old/rpc/OperateProcessor.java
similarity index 85%
rename from src/main/java/com/yuandian/dataflow/statemachine/rpc/OperateProcessor.java
rename to src/main/java/com/yuandian/dataflow/statemachine_old/rpc/OperateProcessor.java
index b75bdc5..32c8ffc 100644
--- a/src/main/java/com/yuandian/dataflow/statemachine/rpc/OperateProcessor.java
+++ b/src/main/java/com/yuandian/dataflow/statemachine_old/rpc/OperateProcessor.java
@@ -4,7 +4,7 @@
  * @author eson
  *2022年7月12日-11:10:54
  */
-package com.yuandian.dataflow.statemachine.rpc;
+package com.yuandian.dataflow.statemachine_old.rpc;
 
 import java.io.Serializable;
 
@@ -12,10 +12,10 @@ import com.alipay.sofa.jraft.Status;
 import com.alipay.sofa.jraft.error.RaftError;
 import com.alipay.sofa.jraft.rpc.RpcContext;
 import com.alipay.sofa.jraft.rpc.RpcProcessor;
-import com.yuandian.dataflow.statemachine.StateFactory;
-import com.yuandian.dataflow.statemachine.annotations.WorkerRegister;
-import com.yuandian.dataflow.statemachine.closure.GenericClosure;
-import com.yuandian.dataflow.statemachine.operate.Operate;
+import com.yuandian.dataflow.statemachine_old.StateFactory;
+import com.yuandian.dataflow.statemachine_old.annotations.WorkerRegister;
+import com.yuandian.dataflow.statemachine_old.closure.GenericClosure;
+import com.yuandian.dataflow.statemachine_old.operate.Operate;
 
 import javassist.ClassPath;
  
diff --git a/src/main/java/com/yuandian/dataflow/statemachine/rpc/RaftResponse.java b/src/main/java/com/yuandian/dataflow/statemachine_old/rpc/RaftResponse.java
similarity index 92%
rename from src/main/java/com/yuandian/dataflow/statemachine/rpc/RaftResponse.java
rename to src/main/java/com/yuandian/dataflow/statemachine_old/rpc/RaftResponse.java
index f931b7f..ce3b494 100644
--- a/src/main/java/com/yuandian/dataflow/statemachine/rpc/RaftResponse.java
+++ b/src/main/java/com/yuandian/dataflow/statemachine_old/rpc/RaftResponse.java
@@ -4,7 +4,7 @@
  * @author eson
  *2022年7月13日-09:07:22
  */
-package com.yuandian.dataflow.statemachine.rpc;
+package com.yuandian.dataflow.statemachine_old.rpc;
 
 import java.io.Serializable;
 
diff --git a/src/main/java/com/yuandian/dataflow/statemachine/state/State.java b/src/main/java/com/yuandian/dataflow/statemachine_old/state/State.java
similarity index 93%
rename from src/main/java/com/yuandian/dataflow/statemachine/state/State.java
rename to src/main/java/com/yuandian/dataflow/statemachine_old/state/State.java
index c48a18f..eb3b361 100644
--- a/src/main/java/com/yuandian/dataflow/statemachine/state/State.java
+++ b/src/main/java/com/yuandian/dataflow/statemachine_old/state/State.java
@@ -4,7 +4,7 @@
  * @author eson
  *2022年7月13日-09:11:26
  */
-package com.yuandian.dataflow.statemachine.state;
+package com.yuandian.dataflow.statemachine_old.state;
 
 import java.io.Serializable;
 
diff --git a/src/main/java/com/yuandian/dataflow/statemachine/state/WorkerState.java b/src/main/java/com/yuandian/dataflow/statemachine_old/state/WorkerState.java
similarity index 94%
rename from src/main/java/com/yuandian/dataflow/statemachine/state/WorkerState.java
rename to src/main/java/com/yuandian/dataflow/statemachine_old/state/WorkerState.java
index d837c49..ec2f5f2 100644
--- a/src/main/java/com/yuandian/dataflow/statemachine/state/WorkerState.java
+++ b/src/main/java/com/yuandian/dataflow/statemachine_old/state/WorkerState.java
@@ -4,7 +4,7 @@
  * @author eson
  *2022年7月15日-10:04:00
  */
-package com.yuandian.dataflow.statemachine.state;
+package com.yuandian.dataflow.statemachine_old.state;
 
 import java.io.Serializable;
 import java.time.Instant;
diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml
index 4adb380..84633c8 100644
--- a/src/main/resources/logback.xml
+++ b/src/main/resources/logback.xml
@@ -24,7 +24,7 @@
       <appender-ref ref="CONSOLE" />
    </root> -->
    
-   <logger name="com.yuandian.dataflow" level="debug|info">
+   <logger name="com.yuandian.dataflow" level="debug|info|error">
       <appender-ref ref="CONSOLE"/>
   </logger>
 </configuration>
\ No newline at end of file