From ac2854ee7e7d3d53905e4edc05d82ffaac818f34 Mon Sep 17 00:00:00 2001 From: huangsimin <474420502@qq.com> Date: Mon, 8 Aug 2022 17:58:58 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8F=8Cgrpc=E9=9C=80=E8=A6=81=E4=B8=80?= =?UTF-8?q?=E8=B5=B7=E5=90=AF=E5=8A=A8=E8=B5=8B=E5=80=BC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../dataflow/statemachine/StateMachine.java | 35 +++++++++++++------ .../dataflow/statemachine/StateServer.java | 18 +++++----- .../statemachine/StateServerFactory.java | 7 +++- .../statemachine/client/CounterClient.java | 1 + 4 files changed, 42 insertions(+), 19 deletions(-) diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java index 2f712be..2e8985e 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java @@ -57,6 +57,9 @@ import java.io.ObjectOutputStream; import java.nio.charset.Charset; import java.util.Collection; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -94,6 +97,9 @@ public class StateMachine extends BaseStateMachine { return leader.get(); } + + public Executor asyncExecutor = Executors.newFixedThreadPool(2); + /** * initialize the state machine by initilize the state machine storage and * calling the load method which reads the last applied command and restore it @@ -194,31 +200,40 @@ public class StateMachine extends BaseStateMachine { return last.getIndex(); } + + @Override public void notifyLeaderChanged(RaftGroupMemberId groupMemberId, RaftPeerId newLeaderId) { + + leader.set(newLeaderId == groupMemberId.getPeerId()); log.info("newLeaderId: {} groupMemberId: {} leader {}", newLeaderId , groupMemberId.getPeerId(), isLeader()); + // super.notifyLeaderChanged(groupMemberId, newLeaderId); + + asyncExecutor.execute(()->{ + var op = new Operate(OperateType.PUT_WORKERSTATE, new WorkerState(StateServerFactory.CurrentPeerId())); + try { + var reply = StateServerFactory.send(op); + log.info("123 {}", reply); + } catch (IOException e) { + e.printStackTrace(); + } + }); + + + - var op = new Operate(OperateType.PUT_WORKERSTATE, new WorkerState(StateServerFactory.CurrentPeerId())); - try { - var reply = StateServerFactory.send(op); - log.info("{}", reply); - } catch (IOException e) { - e.printStackTrace(); - } if (MasterFactory.getMasterExecute().isAlive()) MasterFactory.getMasterExecute().interrupt(); if(isLeader()) MasterFactory.getMasterExecute().start(); - - - super.notifyLeaderChanged(groupMemberId, newLeaderId); + } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateServer.java b/src/main/java/com/yuandian/dataflow/statemachine/StateServer.java index d7fae07..c2c6ce7 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateServer.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateServer.java @@ -67,8 +67,9 @@ public final class StateServer implements Closeable { public static HashMap activesPeers = new HashMap<>(); - private final RaftClient raftClient; + private RaftClient raftClient = null; private final RaftServer raftServer; + private final RaftGroup raftGroupConf; private final ProcessorServer processorServer; private PeerId peer = null; @@ -92,23 +93,23 @@ public final class StateServer implements Closeable { //create the counter state machine which hold the counter value StateMachine stateMachine = new StateMachine(); - RaftGroup raftGroup = RaftGroup.valueOf( + raftGroupConf = RaftGroup.valueOf( RaftGroupId.valueOf(CLUSTER_GROUP_ID), peers); - log.info("raftGroup: {}", raftGroup); + log.info("raftGroup: {}", raftGroupConf); //create and start the Raft server this.raftServer = RaftServer.newBuilder() - .setGroup(raftGroup) + .setGroup(raftGroupConf) .setProperties(properties) .setServerId(curpeer.getId()) .setStateMachine(stateMachine) .build(); log.info("raftGroup: {}", this.raftServer); - this.raftServer.start(); + // create RaftClient - raftClient = buildClient(raftGroup); - + + this.processorServer = new ProcessorServer(); @@ -117,9 +118,10 @@ public final class StateServer implements Closeable { // block public void start() throws IOException, InterruptedException { - this.processorServer.getGrpcServer().start(); raftServer.start(); + this.processorServer.getGrpcServer().start(); this.peer = new PeerId(raftServer.getPeer(), this.processorServer.getGrpcServer().getPort()); + raftClient = buildClient(raftGroupConf); this.processorServer.getGrpcServer().awaitTermination(); } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java b/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java index 6d398c2..8d6a105 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java @@ -3,6 +3,7 @@ package com.yuandian.dataflow.statemachine; import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.concurrent.CompletableFuture; import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftClientReply; @@ -38,7 +39,11 @@ public class StateServerFactory { } public static RaftClientReply send(Message msg) throws IOException { - return stateServer.getRaftClient().io().send(msg); } + + public static CompletableFuture asyncSend(Message msg) throws IOException { + + return stateServer.getRaftClient().async().send(msg); + } } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/client/CounterClient.java b/src/main/java/com/yuandian/dataflow/statemachine/client/CounterClient.java index f18cbb0..9fe6e64 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/client/CounterClient.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/client/CounterClient.java @@ -98,6 +98,7 @@ public final class CounterClient { latch.countDown(); } + //shutdown the executor service and wait until they finish their work executorService.shutdown();