加入强大的vscode debug

This commit is contained in:
huangsimin 2022-07-30 04:07:30 +08:00
parent ae6abc715c
commit d1429e8114
12 changed files with 217 additions and 121 deletions

42
.vscode/launch.json vendored
View File

@ -6,29 +6,53 @@
"configurations": [
{
"type": "java",
"name": "Launch OperateProcessor",
"name": "Raft-0",
"request": "launch",
"mainClass": "com.yuandian.dataflow.statemachine.rpc.OperateProcessor",
"projectName": "dataflow"
"mainClass": "com.yuandian.dataflow.Server",
"projectName": "dataflow",
"args": [
"0"
],
},
{
"type": "java",
"name": "Launch StateFactory",
"name": "Raft-1",
"request": "launch",
"mainClass": "com.yuandian.dataflow.statemachine.StateFactory",
"projectName": "dataflow"
"mainClass": "com.yuandian.dataflow.Server",
"projectName": "dataflow",
"args": [
"1"
],
},
{
"type": "java",
"name": "Launch Server",
"name": "Raft-2",
"request": "launch",
"mainClass": "com.yuandian.dataflow.Server",
"projectName": "dataflow",
"args": [
"2"
],
"preLaunchTask": "restart",
"postDebugTask": "stopall"
},
// {
// "type": "java",
// "name": "Launch Server",
// "request": "launch",
// "mainClass": "com.yuandian.dataflow.Server",
// "projectName": "dataflow",
// "args": [
// "2"
// ],
// "preLaunchTask": "restart",
// "postDebugTask": "stopall"
// }
],
"compounds": [
{
"name": "Rafts-Server",
"configurations": ["Raft-0","Raft-1", "Raft-2"]
}
]
}

View File

@ -0,0 +1,108 @@
package com.yuandian.dataflow.controller;
import java.time.Instant;
import java.util.List;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.error.RemotingException;
import com.alipay.sofa.jraft.rpc.InvokeCallback;
import com.google.protobuf.Any;
import com.yuandian.dataflow.controller.PacketsProcessor.PacketsRequest;
import com.yuandian.dataflow.proto.msgtype.BacktrackingFlowOuterClass;
import com.yuandian.dataflow.statemachine.StateFactory;
import com.yuandian.dataflow.statemachine.annotations.MasterRegister;
import com.yuandian.dataflow.statemachine.closure.GenericClosure;
import com.yuandian.dataflow.statemachine.master.MasterContext;
import com.yuandian.dataflow.statemachine.master.MasterExecute;
import com.yuandian.dataflow.statemachine.operate.Operate;
import com.yuandian.dataflow.statemachine.operate.Operate.OperateType;
import com.yuandian.dataflow.statemachine.state.State;
import com.yuandian.dataflow.statemachine.state.WorkerState;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@MasterRegister
public class MasterProcessor implements MasterExecute {
private final int MAX_TASKS = 100;
@Override
public void loop(MasterContext cxt) {
var alivePeers = List.copyOf(StateFactory.getRaftNode().listAlivePeers());
log.debug("master({}) execute {}", StateFactory.getServerId(),
StateFactory.getRaftNode().listAlivePeers());
if (alivePeers == null) {
return;
}
// 读一致性
StateFactory.readIndexState(new GenericClosure<State>() {
@Override
public void run(Status status) {
var state = this.getValue();
// log.debug("masterExecute start {} {}", status, alivePeers);
alivePeers.forEach((peer) -> {
if (state == null) {
log.error("readIndexState获取的状态为 {}", state);
return;
}
WorkerState ws = state.getWorkers().get(peer);
if (ws == null) {
log.error("WorkerState获取的状态为 {}", ws);
return;
}
var canTasks = MAX_TASKS - ws.getTaskQueueSize();
log.info("剩余能处理的任务数量[{}] :{}", peer, canTasks);
if (canTasks <= 0) {
return;
}
ws.setUpdateAt(Instant.now());
ws.setTaskQueueSize(MAX_TASKS);
// 模拟发送包的数据到该节点上
var request = new PacketsRequest();
for (int i = 0; i < canTasks; i++) {
var p = Any.pack(BacktrackingFlowOuterClass.BacktrackingFlow
.newBuilder()
.setTableId(10086)
.build());
request.getPackets().add(p);
}
// 先提交 节点的 剩余能处理的任务数量. 然后再处理
Operate.CallOperate(
new Operate(OperateType.PUT_WORKERSTATE, ws),
new GenericClosure<Operate>() {
@Override
public void run(Status status) {
log.info("PacketsRequest run {}", status);
try {
StateFactory.rpcClientInvokeAsync(peer.getEndpoint(), request,
new InvokeCallback() {
@Override
public void complete(Object result, Throwable err) {
if (err != null) {
// TODO: 如果错误, 需要让节点恢复任务处理的状态
log.debug("{}", err);
}
log.debug("PacketsRequest: {}", result);
}
}, 5000);
} catch (InterruptedException | RemotingException e) {
log.info("error send packets {}", e.toString());
}
}
});
});
}
});
}
}

View File

@ -17,11 +17,11 @@ import com.alipay.sofa.jraft.rpc.RpcContext;
import com.alipay.sofa.jraft.rpc.RpcProcessor;
import com.google.protobuf.Any;
import com.yuandian.dataflow.statemachine.StateFactory;
import com.yuandian.dataflow.statemachine.annotations.WorkerRegister;
import com.yuandian.dataflow.statemachine.closure.GenericClosure;
import com.yuandian.dataflow.statemachine.operate.Operate;
import com.yuandian.dataflow.statemachine.operate.Operate.OperateType;
import com.yuandian.dataflow.statemachine.rpc.RaftResponse;
import com.yuandian.dataflow.statemachine.rpc.annotations.ProcessorRaft;
import com.yuandian.dataflow.statemachine.state.State;
import lombok.Getter;
@ -36,7 +36,7 @@ import lombok.extern.slf4j.Slf4j;
*/
@Slf4j
@ProcessorRaft
@WorkerRegister
public class PacketsProcessor implements RpcProcessor<PacketsProcessor.PacketsRequest> {
@Setter

View File

@ -7,7 +7,7 @@ import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.rpc.RpcContext;
import com.alipay.sofa.jraft.rpc.RpcProcessor;
import com.yuandian.dataflow.statemachine.StateFactory;
import com.yuandian.dataflow.statemachine.rpc.annotations.ProcessorRaft;
import com.yuandian.dataflow.statemachine.annotations.WorkerRegister;
import lombok.Getter;
import lombok.Setter;
@ -18,7 +18,7 @@ import lombok.extern.slf4j.Slf4j;
* 例子 强制转换leader
*/
@Slf4j
@ProcessorRaft
@WorkerRegister
public class TransferLeaderProcessor implements RpcProcessor<TransferLeaderProcessor.LeaderRequest> {
@Setter

View File

@ -18,6 +18,8 @@ import com.google.protobuf.Any;
import com.yuandian.dataflow.controller.PacketsProcessor.PacketsRequest;
import com.yuandian.dataflow.proto.msgtype.BacktrackingFlowOuterClass;
import com.yuandian.dataflow.statemachine.closure.GenericClosure;
import com.yuandian.dataflow.statemachine.master.MasterContext;
import com.yuandian.dataflow.statemachine.master.MasterExecute;
import com.yuandian.dataflow.statemachine.operate.Operate;
import com.yuandian.dataflow.statemachine.operate.Operate.OperateType;
import com.yuandian.dataflow.statemachine.state.State;
@ -40,113 +42,30 @@ import lombok.extern.slf4j.Slf4j;
@Setter
@ToString
public class MasterFactory {
public static final int MAX_TASKS = 100;
public static class MasterContext {
AtomicBoolean isExit = new AtomicBoolean(false);
Object share;
private static MasterExecute masterExecuteCls;
public static void registerMasterLoop(MasterExecute masterExecute) {
if(masterExecuteCls != null) {
log.error("registerMasterLoop is registed");
}
masterExecuteCls = masterExecute;
}
private static MasterContext cxt = new MasterContext();
private static Consumer<MasterContext> masterMainLoop;
public static Thread masterExecute = new Thread(new Runnable() {
public static Thread masterExecuteThread = new Thread(new Runnable() {
@Override
public void run() {
try {
while (!cxt.isExit.get()) {
masterMainLoop.accept(cxt);
var alivePeers = List.copyOf(StateFactory.getRaftNode().listAlivePeers());
log.debug("master({}) execute {}", StateFactory.getServerId(),
StateFactory.getRaftNode().listAlivePeers());
if (alivePeers != null) {
// 读一致性
StateFactory.readIndexState(new GenericClosure<State>() {
@Override
public void run(Status status) {
var state = this.getValue();
// log.debug("masterExecute start {} {}", status, alivePeers);
alivePeers.forEach((peer) -> {
if (state == null) {
log.error("readIndexState获取的状态为 {}", state);
return;
}
WorkerState ws = state.getWorkers().get(peer);
if (ws == null) {
log.error("WorkerState获取的状态为 {}", ws);
return;
}
var canTasks = MAX_TASKS - ws.getTaskQueueSize();
log.info("剩余能处理的任务数量[{}] :{}", peer, canTasks);
if (canTasks <= 0) {
return;
}
ws.setUpdateAt(Instant.now());
ws.setTaskQueueSize(MAX_TASKS);
// 模拟发送包的数据到该节点上
var request = new PacketsRequest();
for (int i = 0; i < canTasks; i++) {
var p = Any.pack(BacktrackingFlowOuterClass.BacktrackingFlow
.newBuilder()
.setTableId(10086)
.build());
request.getPackets().add(p);
}
// 先提交 节点的 剩余能处理的任务数量. 然后再处理
Operate.CallOperate(
new Operate(OperateType.PUT_WORKERSTATE, ws),
new GenericClosure<Operate>() {
@Override
public void run(Status status) {
log.info("PacketsRequest run {}", status);
try {
StateFactory.rpcClientInvokeAsync(peer.getEndpoint(), request,
new InvokeCallback() {
@Override
public void complete(Object result, Throwable err) {
if (err != null) {
// TODO: 如果错误, 需要让节点恢复任务处理的状态
log.debug("{}", err);
}
log.debug("PacketsRequest: {}", result);
}
}, 5000);
} catch (InterruptedException | RemotingException e) {
log.info("error send packets {}", e.toString());
}
}
});
});
}
});
}
Thread.sleep(2000);
}
} catch (InterruptedException e) {
log.info("{}", e.toString());
MasterContext cxt = new MasterContext();
while (!cxt.getIsExit().get()) {
masterExecuteCls.loop(cxt);
}
}
});
public static Thread getMasterExecute() {
return masterExecute;
return masterExecuteThread;
}
public static void Init() {

View File

@ -46,11 +46,13 @@ import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory;
import com.alipay.sofa.jraft.util.BytesUtil;
import com.alipay.sofa.jraft.util.Endpoint;
import com.alipay.sofa.jraft.util.ThreadPoolUtil;
import com.yuandian.dataflow.statemachine.annotations.MasterRegister;
import com.yuandian.dataflow.statemachine.annotations.WorkerRegister;
import com.yuandian.dataflow.statemachine.closure.GenericClosure;
import com.yuandian.dataflow.statemachine.master.MasterExecute;
import com.yuandian.dataflow.statemachine.operate.Operate;
import com.yuandian.dataflow.statemachine.rpc.OperateProcessor.OperateRequest;
import com.yuandian.dataflow.statemachine.rpc.RaftResponse;
import com.yuandian.dataflow.statemachine.rpc.annotations.ProcessorRaft;
import com.yuandian.dataflow.statemachine.state.State;
import com.yuandian.dataflow.utils.Utils;
@ -188,7 +190,11 @@ public class StateFactory {
var clsName = traces[traces.length - 1].getClassName();
var packName = clsName.substring(0, Utils.indexOfAsPossible(clsName, ".", 3));
log.info("获取 {} -> {} 下包的所有注解",clsName, packName );
Set<Class<?>> scans = new Reflections(packName).getTypesAnnotatedWith(ProcessorRaft.class);
var refl = new Reflections(packName);
Set<Class<?>> scans = refl.getTypesAnnotatedWith(WorkerRegister.class);
scans.forEach((pRaftClass) -> {
scansMap.put(pRaftClass.getName(), pRaftClass);
});
@ -202,6 +208,18 @@ public class StateFactory {
log.info("{} {}", name, e.toString());
}
});
refl.getTypesAnnotatedWith(MasterRegister.class).forEach((pClass)->{
try {
MasterExecute execute = (MasterExecute)pClass.getDeclaredConstructor().newInstance();
MasterFactory.registerMasterLoop(execute);
} catch (InstantiationException | IllegalAccessException | IllegalArgumentException
| InvocationTargetException | NoSuchMethodException | SecurityException e) {
log.info("{}", e.toString());
}
});;
// 启动集群
node = cluster.start();
@ -220,10 +238,15 @@ public class StateFactory {
public void run(Status status, long index, byte[] reqCtx) {
log.debug("readIndexState({}) {}", getServerId(), status);
if (status.isOk()) {
// 回调失败
closure.success(ss.fsm.getState());
closure.setValue(ss.fsm.getState());
} else {
// 提交同步
}
// 回调失败
closure.run(status);
}
});

View File

@ -90,7 +90,6 @@ public class StateMachine extends StateMachineAdapter {
switch (op.getType()) {
case PUT_WORKERSTATE:
WorkerState ws = op.getValue();
log.debug("PUT {}", ws.peerId);
state.getWorkers().put(ws.peerId, ws);

View File

@ -4,7 +4,7 @@
* @author eson
*2022年7月21日-14:27:49
*/
package com.yuandian.dataflow.statemachine.rpc.annotations;
package com.yuandian.dataflow.statemachine.annotations;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
@ -19,6 +19,6 @@ import java.lang.annotation.Target;
*/
@Retention(RetentionPolicy.CLASS)
@Target(ElementType.TYPE)
public @interface Master {
public @interface MasterRegister {
public int order() default 0 ;
}

View File

@ -4,7 +4,7 @@
* @author eson
*2022年7月21日-14:27:49
*/
package com.yuandian.dataflow.statemachine.rpc.annotations;
package com.yuandian.dataflow.statemachine.annotations;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
@ -19,5 +19,5 @@ import java.lang.annotation.Target;
*/
@Retention(RetentionPolicy.CLASS)
@Target(ElementType.TYPE)
public @interface ProcessorRaft {
public @interface WorkerRegister {
}

View File

@ -0,0 +1,15 @@
package com.yuandian.dataflow.statemachine.master;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Getter
@Setter
public class MasterContext {
private AtomicBoolean isExit = new AtomicBoolean(false);
private Object share;
}

View File

@ -0,0 +1,8 @@
package com.yuandian.dataflow.statemachine.master;
/**
* Master的主线程循环
*/
public interface MasterExecute {
void loop(MasterContext cxt);
}

View File

@ -13,9 +13,9 @@ import com.alipay.sofa.jraft.error.RaftError;
import com.alipay.sofa.jraft.rpc.RpcContext;
import com.alipay.sofa.jraft.rpc.RpcProcessor;
import com.yuandian.dataflow.statemachine.StateFactory;
import com.yuandian.dataflow.statemachine.annotations.WorkerRegister;
import com.yuandian.dataflow.statemachine.closure.GenericClosure;
import com.yuandian.dataflow.statemachine.operate.Operate;
import com.yuandian.dataflow.statemachine.rpc.annotations.ProcessorRaft;
import javassist.ClassPath;
@ -31,7 +31,7 @@ import lombok.extern.slf4j.Slf4j;
*2022年7月12日-11:10:54
*/
@Slf4j
@ProcessorRaft
@WorkerRegister
public class OperateProcessor implements RpcProcessor<OperateProcessor.OperateRequest> {
/**