This commit is contained in:
2022-06-06 01:02:55 +08:00
parent 911a7883ce
commit 3dadd8c4e8
7 changed files with 323 additions and 67 deletions

View File

@@ -4,13 +4,16 @@ import java.nio.ByteBuffer;
import com.alipay.sofa.jraft.Closure;
import com.alipay.sofa.jraft.entity.Task;
import com.google.gson.JsonObject;
import com.yuandian.dataflow.Server;
import com.yuandian.dataflow.entity.Response;
import com.yuandian.dataflow.statemachine.RaftClosure;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.ObjectUtils.Null;
import org.apache.commons.logging.Log;
import org.apache.ratis.thirdparty.org.checkerframework.common.reflection.qual.GetMethod;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;
@@ -22,14 +25,16 @@ import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import com.alipay.sofa.jraft.Node;
@Slf4j
@Controller
public class TaskLog {
private static final Logger LOG = LoggerFactory.getLogger(TaskLog.class);
// private static final Logger log = LoggerFactory.getLogger(TaskLog.class);
private static Node node = Server.GetNode();
@GetMapping(path = "/test")
public ResponseEntity<String> greeting() {
public ResponseEntity<Response> Processing(@RequestBody int status) {
Task task = new Task();
@@ -37,7 +42,7 @@ public class TaskLog {
LOG.error(node.toString());
log.error(node.toString());
@@ -49,9 +54,18 @@ public class TaskLog {
JsonObject response = new JsonObject();
response.addProperty("status", "success");
response.addProperty("apply", "hello");
return new ResponseEntity<String>(response.toString(), HttpStatus.OK);
Response response = new Response();
response.code = HttpStatus.OK;
response.message = HttpStatus.OK.toString();
return new ResponseEntity<Response>(response, HttpStatus.OK);
}
@GetMapping(path = "/test2")
public ResponseEntity<Response> MongodbTest(@RequestBody int status) {
Response response = new Response();
return new ResponseEntity<Response>(response, HttpStatus.OK);
}
}

View File

@@ -0,0 +1,48 @@
package com.yuandian.dataflow.entity;
import java.io.Serializable;
import java.lang.annotation.Documented;
import java.time.LocalDateTime;
import org.bson.Document;
import org.bson.codecs.pojo.annotations.BsonCreator;
import org.bson.codecs.pojo.annotations.BsonDiscriminator;
import org.bson.codecs.pojo.annotations.BsonProperty;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.util.JSONPObject;
import lombok.Getter;
import lombok.Setter;
@BsonDiscriminator
@Getter
@Setter
public final class Doc extends Document {
@JsonProperty("code")
@BsonProperty("code")
public int Code ;
@JsonProperty("ts")
@BsonProperty("ts")
public LocalDateTime TS;
@JsonProperty("desc")
@BsonProperty("desc")
public String Desc;
@JsonProperty("data")
@BsonProperty("data")
public Document Data;
}

View File

@@ -0,0 +1,21 @@
package com.yuandian.dataflow.entity;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Getter;
import lombok.Setter;
import org.springframework.http.HttpStatus;
@Getter
@Setter
public class Response {
@JsonProperty("code")
public HttpStatus code;
@JsonProperty("message")
public String message ;
@JsonProperty("data")
public Object data ;
}

View File

@@ -1 +1,4 @@
server.port=3440
server.port=3440

View File

@@ -0,0 +1,13 @@
<configuration>
<appender name="CONSOLE"
class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>
%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger - %msg%n
</pattern>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="CONSOLE" />
</root>
</configuration>

View File

@@ -1,38 +1,193 @@
package com.yuandian.dataflow;
import junit.framework.Test;
import junit.framework.TestCase;
import junit.framework.TestSuite;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.swing.text.AbstractDocument.BranchElement;
import org.bson.Document;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.springframework.expression.spel.ast.FunctionReference;
import com.mongodb.MongoClient;
import com.mongodb.client.model.InsertManyOptions;
import com.yuandian.dataflow.entity.Doc;
import io.netty.handler.codec.dns.DatagramDnsQuery;
import lombok.Cleanup;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
/**
* Unit test for simple App.
*/
public class AppTest
extends TestCase
{
/**
* Create the test case
*
* @param testName name of the test case
*/
public AppTest( String testName )
{
super( testName );
@DisplayName("AppTest")
@Slf4j
public class AppTest {
@FunctionalInterface
public interface FuncReturn {
public float Execute();
}
/**
* @return the suite of tests being tested
*/
public static Test suite()
{
return new TestSuite( AppTest.class );
public class Metric {
public class ConditionExecute {
}
public class Element {
public Duration Costtime;
public float Value;
public Element(Duration costtime, float value) {
Costtime = costtime;
Value = value;
}
}
private LinkedBlockingQueue<Element> dataBlockQueue;
private Queue<Element> countQueue;
AtomicBoolean Running = new AtomicBoolean(true);
public Metric() {
dataBlockQueue = new LinkedBlockingQueue<Element>();
countQueue = new LinkedList<Element>();
}
private Thread countWorker = new Thread(() -> {
while (Running.get()) {
try {
countQueue.add(dataBlockQueue.take());
} catch (InterruptedException e1) {
e1.printStackTrace();
Runtime.getRuntime().exit(0);
}
if (countQueue.size() >= 24) {
Duration Cost = null;
float Total = 0;
var iter = countQueue.iterator();
while (iter.hasNext()) {
var e = iter.next();
if (Cost == null) {
Cost = e.Costtime;
} else {
Cost.plus(e.Costtime);
}
Total += e.Value;
}
log.info("Metric","qps: {}/s", (Total * 1000000000) / Cost.toNanos() );
countQueue.poll();
}
}
});
public void start() {
if (!countWorker.isAlive()) {
countWorker.start();
}
}
public void close() {
Running.set(false);
}
public void push(FuncReturn exec) {
Instant now = Instant.now();
var v = exec.Execute();
dataBlockQueue.add(new Element(Duration.between(now, Instant.now()), v));
}
}
/**
* Rigourous Test :-)
*/
public void testApp()
{
assertTrue( true );
@Test
public void Mongodb() throws InterruptedException {
ArrayList<Thread> execs = new ArrayList<>();
final Metric metric = new Metric();
metric.start();
for (int c = 0; c < 10; c++) {
Thread exec = new Thread(() -> {
@Cleanup
MongoClient mgo = new MongoClient("localhost", 27017);
log.info("msg");
long LoopNumber = 5;
long BatchSize = 20000;
var db = mgo.getDatabase("yuandian");
var cltdoc = db.getCollection("doc");
for (int n = 0; n < LoopNumber; n++) {
metric.push(() -> {
List<Doc> documents = new ArrayList<>();
Random r = new Random();
for (int i = 0; i < BatchSize; i++) {
var doc = new Doc();
var datadoc = new Document();
doc.append("code", r.nextInt(100));
doc.append("desc", "desc");
doc.append("ts", Instant.now());
for (int ii = 0; ii < 24; ii++) {
UUID uid = UUID.randomUUID();
datadoc
.append(uid.toString(), uid.toString());
}
doc.append("data", datadoc);
documents.add(doc);
}
var opt = new InsertManyOptions();
cltdoc.insertMany(documents, opt);
return BatchSize;
});
}
});
exec.start();
execs.add(exec);
}
;
execs.forEach((e) -> {
try {
e.join();
} catch (InterruptedException e1) {
e1.printStackTrace();
}
});
metric.close();
}
}