diff --git a/pom.xml b/pom.xml
index bc60ba6..3454499 100644
--- a/pom.xml
+++ b/pom.xml
@@ -3,7 +3,7 @@
com.yuandian.dataflow
dataflow
- 1.0-SNAPSHOT
+ 1.0.0-SNAPSHOT
jar
dataflow
@@ -22,10 +22,12 @@
1.46.0
1.7.36
1.3.10
- 2.7.0
+ 2.7.0
3.12.11
2.1.0
1.30
+
+ 1.0.0-SNAPSHOT
@@ -44,6 +46,12 @@
test
+
+ com.yuandian.common
+ config
+ ${yuandian.common.config.version}
+
+
org.slf4j
@@ -77,7 +85,7 @@
org.springframework.boot
spring-boot-starter-web
- ${spring.boot.web}
+ ${spring.boot.version}
@@ -133,14 +141,28 @@
1.18.24
provided
-
-
-
-
-
+
+
+ yuandian-nexus
+ Team Nexus Repository
+ http://mvn.yuandian.com/repository/maven-public
+
+ true
+ always
+
+
+
+
+
+ yuandian-nexus
+ Team Nexus Repository
+ http://mvn.yuandian.com/repository/maven-public
+
+
+
kr.motd.maven
@@ -183,7 +205,7 @@
org.springframework.boot
spring-boot-maven-plugin
- ${spring.boot.web}
+ ${spring.boot.version}
diff --git a/src/main/java/com/yuandian/dataflow/Server.java b/src/main/java/com/yuandian/dataflow/Server.java
index 0f2de9f..de77c0c 100644
--- a/src/main/java/com/yuandian/dataflow/Server.java
+++ b/src/main/java/com/yuandian/dataflow/Server.java
@@ -1,6 +1,7 @@
package com.yuandian.dataflow;
+import com.yuandian.common.Config;
import com.yuandian.dataflow.statemachine.RaftClosure;
import com.yuandian.dataflow.statemachine.StateMachine;
@@ -69,6 +70,9 @@ public class Server {
public static void main(String[] args) {
+
+
+
String[] peers = new String[]{"localhost:4440","localhost:4441","localhost:4442"};
String[] sprPeers = new String[]{"3440","3441","3442"};
diff --git a/src/main/java/com/yuandian/dataflow/config/Config.java b/src/main/java/com/yuandian/dataflow/config/Config.java
deleted file mode 100644
index 177e182..0000000
--- a/src/main/java/com/yuandian/dataflow/config/Config.java
+++ /dev/null
@@ -1,270 +0,0 @@
-/**
- * description
- *
- * @author eson
- *2022年6月13日-17:08:46
- */
-package com.yuandian.dataflow.config;
-
-
-
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.Executor;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.Function;
-
-import org.yaml.snakeyaml.Yaml;
-
-import com.alibaba.nacos.api.NacosFactory;
-import com.alibaba.nacos.api.PropertyKeyConst;
-import com.alibaba.nacos.api.config.ConfigService;
-import com.alibaba.nacos.api.config.listener.Listener;
-import com.alibaba.nacos.api.exception.NacosException;
-
-import lombok.Getter;
-import lombok.Setter;
-import lombok.extern.slf4j.Slf4j;
-
-/**
- * description nacos配置.
- *
- * @author eson
- *2022年6月13日-17:08:46
- */
-@Slf4j
-@Getter
-@Setter
-public class Config {
-
- public static String DEFAULT_CONFIT_FILE = "application.properties";
- public static String DEFAULT_CONFIG_ADDR = "yuandian.dataflow.config.nacos.server.addr";
- // public static String DEFAULT_CONFIG_DATAID = "yuandian.dataflow.config.nacos.dataid";
- // public static String DEFAULT_CONFIG_GROUP = "yuandian.dataflow.config.nacos.group";+
-
- // 默认
- public static String DEFAULT_GROUP_DATAID = "yuandian.dataflow";
-
- // 所有生成的nacos客户端
- private static HashMap configDict = new HashMap<>();
-
- // 配置的所有值主类
- public Map data;
- // nacos地址
- public String serverAddr ;
- // nacos dataId
- public String dataId ;
- // nacos group
- public String group ;
- // 线程安全配置锁
- private Lock datalock;
-
-
- // nacos 客户端类
- private ConfigService configService;
-
- private Config(String GroupAndDataId) throws Exception {
- String[] gad = GroupAndDataId.split("\\.");
- if(gad.length != 2) {
- throw new Exception("Group 或者 DataId 不能存在 '.' 的命令");
- }
- this.group = gad[0] + ENV_TEST;
- this.dataId = gad[1] + ENV_TEST;
- connect();
- }
-
- /**
- * 连接nacos
- * @throws IOException
- * @throws NacosException
- */
- private void connect() throws IOException, NacosException {
-
- if(configService != null) {
- configService.shutDown();
- configService = null;
- }
-
- // 获取 app
- Properties prop = new Properties();
- prop.load(Config.class.getClassLoader().getResourceAsStream(DEFAULT_CONFIT_FILE));
- serverAddr = prop.getProperty(DEFAULT_CONFIG_ADDR + ENV_TEST).trim();
- // dataId = prop.getProperty(DEFAULT_CONFIG_DATAID).trim();
- // group = prop.getProperty(DEFAULT_CONFIG_GROUP).trim();
-
- Properties properties = new Properties();
- properties.put(PropertyKeyConst.SERVER_ADDR, serverAddr);
- configService = NacosFactory.createConfigService(properties);
-
- String content = configService.getConfig(dataId, group, 5000);
- Yaml yaml = new Yaml();
- data = yaml.load(content);
- log.info(content);
-
- datalock = new ReentrantLock();
- // 监听 配置更新事件
- configService.addListener(dataId, group, new Listener() {
- @Override
- public void receiveConfigInfo(String configInfo) {
- log.debug("recieve:" + configInfo);
- try {
- datalock.lock();
- data = (Map)new Yaml().load(configInfo);
- log.debug("{}",data);
- } finally {
- datalock.unlock();
- }
- }
-
- @Override
- public Executor getExecutor() {
- return null;
- }
- });
-
-
- }
-
-
- /**
- * 根据多个key获取yaml的值 keys 路径
- * @param keys 获取的key值
- * @return
- */
- public Object get(String ...keys) {
- var cur = data;
- for(var i = 0; i < keys.length - 1;i++ ) {
- var key = keys[i];
- cur = (Map) cur.get(key);
- }
- return cur.get(keys[keys.length - 1]);
- }
-
- /**
- *
- * 用于定位keys的路径后的操作. 创建keys后赋值. 如果存在keys, 可以直接赋值, 不存在则直接报错
- *
- * @author eson
- *2022年6月15日-下午12:06:37
- */
- public class Operator {
- Config config;
- String[] keys;
-
- Operator(Config config, String[] keys) {
- this.config = config;
- this.keys = keys;
- }
-
- /**
- * 创建seek的key
- * @return
- */
- public Operator createKeys() {
- var cur = config.data;
- for(var i = 0; i < keys.length;i++ ) {
- var key = keys[i];
- var vobj = cur.get(key);
- if (vobj == null) {
- vobj = new LinkedHashMap<>();
- cur.put(key, vobj);
- }
- cur = (Map) vobj;
- }
- return this;
- }
-
- /**
- * 定位后赋值
- * @param value
- */
- public void set(Object value) {
- var cur = config.data;
- for(var i = 0; i < keys.length - 1;i++ ) {
- var key = keys[i];
- cur = (Map) cur.get(key);
- }
- cur.put(keys[keys.length - 1], value);
- }
- }
-
- /**
- * 定位. eg. seek("key1", "key2")
- * @param keys
- * @return
- */
- public Operator seek(String ...keys) {
- return new Operator(this, keys);
- }
-
-
-
- /**
- * 删除 key的值. 类型map操作. keys是一个路径 remove("a","b") --> {"a": {"b": 1}} 删除b
- * @param keys
- * @return
- */
- public Object remove(String ...keys) {
- var cur = data;
- for(var i = 0; i < keys.length - 1;i++ ) {
- var key = keys[i];
- cur = (Map) cur.get(key);
- }
- return cur.remove(keys[keys.length - 1]);
- }
-
- /**
- * 更新配置
- * @return 返回是否发布成功.
- * @throws NacosException
- */
- public Boolean update() throws NacosException {
- return configService.publishConfig(dataId, group,new Yaml().dumpAsMap(data));
- }
-
- public static String ENV_TEST = "";
-
- /**
- * 统一使用配置的入口函数 线程安全
- * @param GroupAndDataID 使用配置的标签 eg."group.dataId"
- * @param execute 匿名函数
- * @return 函数返回的值, 如果不需要直接返回null
- * @throws Exception
- */
- public static Object UseConfig(String GroupAndDataID, Function execute) throws Exception {
-
- Config cnf;
- synchronized(configDict) {
- cnf = configDict.get(GroupAndDataID);
- if(cnf == null) {
- cnf = new Config(GroupAndDataID);
- configDict.put(GroupAndDataID, cnf);
- }
- }
-
- try {
- cnf.datalock.lock();
- var res = execute.apply(cnf);
- return res;
- } catch (Exception e) {
- throw e;
- } finally {
- cnf.datalock.unlock();
- }
- }
-
- /**
- * 统一使用配置的入口函数 线程安全
- * @param execute 统一使用配置的入口匿名方法
- * @return 函数返回的值, 如果不需要直接返回null
- * @throws Exception
- */
- public static Object UseConfig(Function execute) throws Exception {
- return UseConfig(DEFAULT_GROUP_DATAID, execute);
- }
-}
diff --git a/src/main/java/com/yuandian/dataflow/grpc/CollectPackets.java b/src/main/java/com/yuandian/dataflow/grpc/CollectPackets.java
index 48bd3ea..99191d7 100644
--- a/src/main/java/com/yuandian/dataflow/grpc/CollectPackets.java
+++ b/src/main/java/com/yuandian/dataflow/grpc/CollectPackets.java
@@ -11,6 +11,7 @@ import java.time.Instant;
import com.google.protobuf.Any;
import com.google.protobuf.InvalidProtocolBufferException;
+import com.yuandian.common.Config;
import com.yuandian.dataflow.proto.CollectPacketsServerGrpc;
@@ -37,7 +38,15 @@ import lombok.extern.slf4j.Slf4j;
public class CollectPackets extends CollectPacketsServerImplBase {
- public static void main(String[] args) throws InvalidProtocolBufferException {
+ public static void main(String[] args) throws Exception {
+
+ Config.UseConfig( (cnf) -> {
+ var tid = cnf.get("test_id");
+ log.info("{}",tid);
+ return null;
+ });
+
+
// private final ManagedChannelBuilder> managedChannelBuilder;
// private final CollectPacketsServerStub blockingStub;
diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties
index f20b366..45b99ac 100644
--- a/src/main/resources/application.properties
+++ b/src/main/resources/application.properties
@@ -1,6 +1,6 @@
server.port=3440
-yuandian.dataflow.config.nacos.server.addr="192.168.1.113:8848"
-yuandian.dataflow.config.nacos.dataid=dataflow
-yuandian.dataflow.config.nacos.group=yuandian
+
+yuandian.dataflow.config.nacos.server.addr=192.168.1.113:8848 # 不能 "为结尾"
+
\ No newline at end of file
diff --git a/src/test/java/com/yuandian/dataflow/config/ConfigTest.java b/src/test/java/com/yuandian/dataflow/config/ConfigTest.java
deleted file mode 100644
index f7df421..0000000
--- a/src/test/java/com/yuandian/dataflow/config/ConfigTest.java
+++ /dev/null
@@ -1,111 +0,0 @@
-package com.yuandian.dataflow.config;
-
-import java.io.IOException;
-import java.time.Instant;
-
-import org.junit.Assert;
-import org.junit.FixMethodOrder;
-import org.junit.experimental.theories.suppliers.TestedOn;
-import org.junit.jupiter.api.Order;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.TestMethodOrder;
-
-import com.alibaba.nacos.api.exception.NacosException;
-
-import lombok.extern.slf4j.Slf4j;
-
-@Slf4j
-public class ConfigTest {
-
-
- /**
- * 测试配置基础用法
- * @throws Exception
- */
- @Test
- @Order(1)
- void testUseConfig() throws Exception {
- Config.ENV_TEST = "-test";
-
- Config.UseConfig((cnf) -> {
- log.info("{}",cnf.data);
- Assert.assertEquals(cnf.get("key1", "key2"), "key_path");
- Instant now = Instant.now();
- cnf.data.put("use_config", now.toString());
- try {
- log.info("{}",cnf.update());
- } catch (NacosException e) {
- e.printStackTrace();
- }
- return null;
- });
- }
-
- @Test
- @Order(2)
- void testRemove() throws Exception {
- Config.ENV_TEST = "-test";
-
- Config.UseConfig((cnf) -> {
- cnf.remove("create1", "create2");
- try {
- log.info("{}",cnf.update()); // 所有增加删除操作要最后同步到nacos. 都需要update
- } catch (NacosException e) {
- e.printStackTrace();
- }
-
- cnf.remove("create1");
- try {
- log.info("{}",cnf.update()); // 所有增加删除操作要最后同步到nacos. 都需要update
- } catch (NacosException e) {
- e.printStackTrace();
- }
- return null;
- });
- }
-
-
- @Test
- @Order(1)
- void testCreateKeys() throws Exception {
- Config.ENV_TEST = "-test";
-
- Config.UseConfig((cnf) -> {
- log.info("{}",cnf.data);
- cnf.seek("create1", "create2", "create3").createKeys().set(Instant.now().toString());;
- try {
- log.info("{}",cnf.update()); // 所有增加删除操作要最后同步到nacos. 都需要update
- } catch (NacosException e) {
- e.printStackTrace();
- }
- return null;
- });
- }
-
-
-
- @Test
- void testUpdate() throws Exception {
- Config.ENV_TEST = "-test";
-
- Config.UseConfig((cnf) -> {
- Instant now = Instant.now();
- cnf.data.put("use_config", now.toString());
- try {
- log.info("{}",cnf.update());
- } catch (NacosException e) {
- e.printStackTrace();
- }
- return null;
- });
- }
-
- @Test
- void testLabelConfig() throws Exception {
- Config.UseConfig("org.fortest", (cnf)->{
- log.info("{}", cnf.get("test"));
- Assert.assertEquals(cnf.get("test"), "groupAndDataId");
- return null;
- });
- }
-}