需要加锁

This commit is contained in:
huangsimin 2022-08-09 00:24:07 +08:00
parent ac2854ee7e
commit e063b5471c
6 changed files with 40 additions and 28 deletions

View File

@ -1,6 +1,7 @@
package com.yuandian.dataflow.statemachine; package com.yuandian.dataflow.statemachine;
import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
@ -11,24 +12,28 @@ import lombok.extern.slf4j.Slf4j;
@Getter @Getter
@Setter @Setter
@ToString @ToString
public class PeerId { public class Peer {
public PeerId(RaftPeer raftPeer, int processorPort) { public Peer(RaftPeerId raftPeer, int processorPort) {
this.raftPeer = raftPeer; this.raftPeerId = raftPeer;
this.processorPort = processorPort; this.processorPort = processorPort;
} }
private RaftPeer raftPeer; public Peer() {
}
private RaftPeerId raftPeerId;
private int processorPort; private int processorPort;
@Override @Override
public boolean equals(Object arg0) { public boolean equals(Object arg0) {
return getRaftPeer().getId().hashCode() == ((PeerId)arg0).getRaftPeer().getId().hashCode(); return getRaftPeerId().hashCode() == ((Peer)arg0).getRaftPeerId().hashCode();
} }
@Override @Override
public int hashCode() { public int hashCode() {
return getRaftPeer().getId().hashCode() ; return getRaftPeerId().hashCode() ;
} }
} }

View File

@ -20,6 +20,7 @@ package com.yuandian.dataflow.statemachine;
import org.apache.http.entity.InputStreamEntity; import org.apache.http.entity.InputStreamEntity;
import org.apache.ratis.proto.RaftProtos; import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.proto.RaftProtos.RaftConfigurationProto;
import org.apache.ratis.proto.RaftProtos.RoleInfoProto; import org.apache.ratis.proto.RaftProtos.RoleInfoProto;
import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.protocol.RaftGroupId;
@ -201,20 +202,20 @@ public class StateMachine extends BaseStateMachine {
return last.getIndex(); return last.getIndex();
} }
@Override @Override
public void notifyLeaderChanged(RaftGroupMemberId groupMemberId, RaftPeerId newLeaderId) { public void notifyLeaderChanged(RaftGroupMemberId groupMemberId, RaftPeerId newLeaderId) {
log.info("msg {}", groupMemberId.getPeerId());
StateServerFactory.setCurrentPeerId( new Peer(groupMemberId.getPeerId(), StateServerFactory.stateServer.getProcessorServer().getGrpcServer().getPort()) );
log.info("msg {}", groupMemberId.getPeerId());
leader.set(newLeaderId == groupMemberId.getPeerId()); leader.set(newLeaderId == groupMemberId.getPeerId());
log.info("newLeaderId: {} groupMemberId: {} leader {}", newLeaderId , groupMemberId.getPeerId(), isLeader()); log.info("newLeaderId: {} groupMemberId: {} leader {}", newLeaderId , groupMemberId.getPeerId(), isLeader());
// super.notifyLeaderChanged(groupMemberId, newLeaderId); // super.notifyLeaderChanged(groupMemberId, newLeaderId);
asyncExecutor.execute(()->{ asyncExecutor.execute(()->{
var op = new Operate(OperateType.PUT_WORKERSTATE, new WorkerState(StateServerFactory.CurrentPeerId())); log.info("asyncExecutor");
var op = new Operate(OperateType.PUT_WORKERSTATE, new WorkerState(StateServerFactory.getCurrentPeerId() ));
try { try {
var reply = StateServerFactory.send(op); var reply = StateServerFactory.send(op);
log.info("123 {}", reply); log.info("123 {}", reply);
@ -224,6 +225,8 @@ public class StateMachine extends BaseStateMachine {
}); });

View File

@ -65,14 +65,15 @@ import static java.nio.charset.StandardCharsets.UTF_8;
@Setter @Setter
public final class StateServer implements Closeable { public final class StateServer implements Closeable {
public static HashMap<PeerId, PeerId> activesPeers = new HashMap<>(); public static HashMap<Peer, Peer> activesPeers = new HashMap<>();
private RaftClient raftClient = null; private RaftClient raftClient = null;
private final RaftServer raftServer; private final RaftServer raftServer;
private final RaftGroup raftGroupConf; private final RaftGroup raftGroupConf;
private final ProcessorServer processorServer; private final ProcessorServer processorServer;
private PeerId peer = null; private Peer peer ;
private int grpcPort;
public static final UUID CLUSTER_GROUP_ID = UUID.fromString("79642d72-6166-742d-6461-7461666c6f77"); public static final UUID CLUSTER_GROUP_ID = UUID.fromString("79642d72-6166-742d-6461-7461666c6f77");
@ -89,14 +90,14 @@ public final class StateServer implements Closeable {
//set the port which server listen to in RaftProperty object //set the port which server listen to in RaftProperty object
final int port = NetUtils.createSocketAddr(curpeer.getAddress()).getPort(); final int port = NetUtils.createSocketAddr(curpeer.getAddress()).getPort();
GrpcConfigKeys.Server.setPort(properties, port); GrpcConfigKeys.Server.setPort(properties, port);
log.info("curpeer: {}", curpeer);
//create the counter state machine which hold the counter value //create the counter state machine which hold the counter value
StateMachine stateMachine = new StateMachine(); StateMachine stateMachine = new StateMachine();
raftGroupConf = RaftGroup.valueOf( raftGroupConf = RaftGroup.valueOf(
RaftGroupId.valueOf(CLUSTER_GROUP_ID), peers); RaftGroupId.valueOf(CLUSTER_GROUP_ID), peers);
log.info("raftGroup: {}", raftGroupConf);
//create and start the Raft server //create and start the Raft server
this.raftServer = RaftServer.newBuilder() this.raftServer = RaftServer.newBuilder()
.setGroup(raftGroupConf) .setGroup(raftGroupConf)
@ -104,24 +105,22 @@ public final class StateServer implements Closeable {
.setServerId(curpeer.getId()) .setServerId(curpeer.getId())
.setStateMachine(stateMachine) .setStateMachine(stateMachine)
.build(); .build();
log.info("raftGroup: {}", this.raftServer);
// create RaftClient // create RaftClient
this.processorServer = new ProcessorServer(); this.processorServer = new ProcessorServer();
this.processorServer.getGrpcServer().start();
this.grpcPort = this.processorServer.getGrpcServer().getPort();
} }
// block // block
public void start() throws IOException, InterruptedException { public void start() throws IOException, InterruptedException {
raftServer.start(); raftServer.start();
this.processorServer.getGrpcServer().start();
this.peer = new PeerId(raftServer.getPeer(), this.processorServer.getGrpcServer().getPort());
raftClient = buildClient(raftGroupConf);
this.processorServer.getGrpcServer().awaitTermination(); this.processorServer.getGrpcServer().awaitTermination();
} }

View File

@ -34,7 +34,12 @@ public class StateServerFactory {
} }
public static PeerId CurrentPeerId() { public static void setCurrentPeerId(Peer peer) {
stateServer.setPeer(peer);
}
public static Peer getCurrentPeerId() {
return stateServer.getPeer(); return stateServer.getPeer();
} }

View File

@ -21,7 +21,7 @@ import java.lang.reflect.Method;
import java.lang.reflect.Modifier; import java.lang.reflect.Modifier;
import java.util.HashMap; import java.util.HashMap;
import com.yuandian.dataflow.statemachine.PeerId; import com.yuandian.dataflow.statemachine.Peer;
/** /**
* 代表任务状态 暂时全局使用这个结构. 添加新增状态 * 代表任务状态 暂时全局使用这个结构. 添加新增状态
@ -37,5 +37,5 @@ public class State implements Serializable {
private static final long serialVersionUID = -1L; private static final long serialVersionUID = -1L;
private HashMap<PeerId,WorkerState> workers = new HashMap<>(); private HashMap<Peer,WorkerState> workers = new HashMap<>();
} }

View File

@ -9,7 +9,7 @@ package com.yuandian.dataflow.statemachine.state;
import java.io.Serializable; import java.io.Serializable;
import java.time.Instant; import java.time.Instant;
import com.yuandian.dataflow.statemachine.PeerId; import com.yuandian.dataflow.statemachine.Peer;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
@ -31,7 +31,7 @@ public class WorkerState implements Serializable {
/** /**
* 节点的对应peerID * 节点的对应peerID
*/ */
public PeerId peerId; public Peer peerId;
/** /**
* 任务队列的数量 * 任务队列的数量
*/ */
@ -46,7 +46,7 @@ public class WorkerState implements Serializable {
* 初始化 并构造 updateAt时间 * 初始化 并构造 updateAt时间
* @param peer 传入当前服务的peer * @param peer 传入当前服务的peer
*/ */
public WorkerState(PeerId peer) { public WorkerState(Peer peer) {
this.peerId = peer; this.peerId = peer;
this.updateAt = Instant.now(); this.updateAt = Instant.now();
} }