From 1f482594cb8561877df88a0f87641ba187555880 Mon Sep 17 00:00:00 2001 From: eson Date: Fri, 17 Jun 2022 18:48:10 +0800 Subject: [PATCH] init --- pom.xml | 40 ++- .../java/com/yuandian/dataflow/Server.java | 4 + .../com/yuandian/dataflow/config/Config.java | 270 ------------------ .../dataflow/grpc/CollectPackets.java | 11 +- src/main/resources/application.properties | 6 +- .../yuandian/dataflow/config/ConfigTest.java | 111 ------- 6 files changed, 48 insertions(+), 394 deletions(-) delete mode 100644 src/main/java/com/yuandian/dataflow/config/Config.java delete mode 100644 src/test/java/com/yuandian/dataflow/config/ConfigTest.java diff --git a/pom.xml b/pom.xml index bc60ba6..3454499 100644 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,7 @@ com.yuandian.dataflow dataflow - 1.0-SNAPSHOT + 1.0.0-SNAPSHOT jar dataflow @@ -22,10 +22,12 @@ 1.46.0 1.7.36 1.3.10 - 2.7.0 + 2.7.0 3.12.11 2.1.0 1.30 + + 1.0.0-SNAPSHOT @@ -44,6 +46,12 @@ test + + com.yuandian.common + config + ${yuandian.common.config.version} + + org.slf4j @@ -77,7 +85,7 @@ org.springframework.boot spring-boot-starter-web - ${spring.boot.web} + ${spring.boot.version} @@ -133,14 +141,28 @@ 1.18.24 provided - - - - - + + + yuandian-nexus + Team Nexus Repository + http://mvn.yuandian.com/repository/maven-public + + true + always + + + + + + yuandian-nexus + Team Nexus Repository + http://mvn.yuandian.com/repository/maven-public + + + kr.motd.maven @@ -183,7 +205,7 @@ org.springframework.boot spring-boot-maven-plugin - ${spring.boot.web} + ${spring.boot.version} diff --git a/src/main/java/com/yuandian/dataflow/Server.java b/src/main/java/com/yuandian/dataflow/Server.java index 0f2de9f..de77c0c 100644 --- a/src/main/java/com/yuandian/dataflow/Server.java +++ b/src/main/java/com/yuandian/dataflow/Server.java @@ -1,6 +1,7 @@ package com.yuandian.dataflow; +import com.yuandian.common.Config; import com.yuandian.dataflow.statemachine.RaftClosure; import com.yuandian.dataflow.statemachine.StateMachine; @@ -69,6 +70,9 @@ public class Server { public static void main(String[] args) { + + + String[] peers = new String[]{"localhost:4440","localhost:4441","localhost:4442"}; String[] sprPeers = new String[]{"3440","3441","3442"}; diff --git a/src/main/java/com/yuandian/dataflow/config/Config.java b/src/main/java/com/yuandian/dataflow/config/Config.java deleted file mode 100644 index 177e182..0000000 --- a/src/main/java/com/yuandian/dataflow/config/Config.java +++ /dev/null @@ -1,270 +0,0 @@ -/** - * description - * - * @author eson - *2022年6月13日-17:08:46 - */ -package com.yuandian.dataflow.config; - - - - -import java.io.IOException; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.Executor; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; -import java.util.function.Function; - -import org.yaml.snakeyaml.Yaml; - -import com.alibaba.nacos.api.NacosFactory; -import com.alibaba.nacos.api.PropertyKeyConst; -import com.alibaba.nacos.api.config.ConfigService; -import com.alibaba.nacos.api.config.listener.Listener; -import com.alibaba.nacos.api.exception.NacosException; - -import lombok.Getter; -import lombok.Setter; -import lombok.extern.slf4j.Slf4j; - -/** - * description nacos配置. - * - * @author eson - *2022年6月13日-17:08:46 - */ -@Slf4j -@Getter -@Setter -public class Config { - - public static String DEFAULT_CONFIT_FILE = "application.properties"; - public static String DEFAULT_CONFIG_ADDR = "yuandian.dataflow.config.nacos.server.addr"; - // public static String DEFAULT_CONFIG_DATAID = "yuandian.dataflow.config.nacos.dataid"; - // public static String DEFAULT_CONFIG_GROUP = "yuandian.dataflow.config.nacos.group";+ - - // 默认 - public static String DEFAULT_GROUP_DATAID = "yuandian.dataflow"; - - // 所有生成的nacos客户端 - private static HashMap configDict = new HashMap<>(); - - // 配置的所有值主类 - public Map data; - // nacos地址 - public String serverAddr ; - // nacos dataId - public String dataId ; - // nacos group - public String group ; - // 线程安全配置锁 - private Lock datalock; - - - // nacos 客户端类 - private ConfigService configService; - - private Config(String GroupAndDataId) throws Exception { - String[] gad = GroupAndDataId.split("\\."); - if(gad.length != 2) { - throw new Exception("Group 或者 DataId 不能存在 '.' 的命令"); - } - this.group = gad[0] + ENV_TEST; - this.dataId = gad[1] + ENV_TEST; - connect(); - } - - /** - * 连接nacos - * @throws IOException - * @throws NacosException - */ - private void connect() throws IOException, NacosException { - - if(configService != null) { - configService.shutDown(); - configService = null; - } - - // 获取 app - Properties prop = new Properties(); - prop.load(Config.class.getClassLoader().getResourceAsStream(DEFAULT_CONFIT_FILE)); - serverAddr = prop.getProperty(DEFAULT_CONFIG_ADDR + ENV_TEST).trim(); - // dataId = prop.getProperty(DEFAULT_CONFIG_DATAID).trim(); - // group = prop.getProperty(DEFAULT_CONFIG_GROUP).trim(); - - Properties properties = new Properties(); - properties.put(PropertyKeyConst.SERVER_ADDR, serverAddr); - configService = NacosFactory.createConfigService(properties); - - String content = configService.getConfig(dataId, group, 5000); - Yaml yaml = new Yaml(); - data = yaml.load(content); - log.info(content); - - datalock = new ReentrantLock(); - // 监听 配置更新事件 - configService.addListener(dataId, group, new Listener() { - @Override - public void receiveConfigInfo(String configInfo) { - log.debug("recieve:" + configInfo); - try { - datalock.lock(); - data = (Map)new Yaml().load(configInfo); - log.debug("{}",data); - } finally { - datalock.unlock(); - } - } - - @Override - public Executor getExecutor() { - return null; - } - }); - - - } - - - /** - * 根据多个key获取yaml的值 keys 路径 - * @param keys 获取的key值 - * @return - */ - public Object get(String ...keys) { - var cur = data; - for(var i = 0; i < keys.length - 1;i++ ) { - var key = keys[i]; - cur = (Map) cur.get(key); - } - return cur.get(keys[keys.length - 1]); - } - - /** - * - * 用于定位keys的路径后的操作. 创建keys后赋值. 如果存在keys, 可以直接赋值, 不存在则直接报错 - * - * @author eson - *2022年6月15日-下午12:06:37 - */ - public class Operator { - Config config; - String[] keys; - - Operator(Config config, String[] keys) { - this.config = config; - this.keys = keys; - } - - /** - * 创建seek的key - * @return - */ - public Operator createKeys() { - var cur = config.data; - for(var i = 0; i < keys.length;i++ ) { - var key = keys[i]; - var vobj = cur.get(key); - if (vobj == null) { - vobj = new LinkedHashMap<>(); - cur.put(key, vobj); - } - cur = (Map) vobj; - } - return this; - } - - /** - * 定位后赋值 - * @param value - */ - public void set(Object value) { - var cur = config.data; - for(var i = 0; i < keys.length - 1;i++ ) { - var key = keys[i]; - cur = (Map) cur.get(key); - } - cur.put(keys[keys.length - 1], value); - } - } - - /** - * 定位. eg. seek("key1", "key2") - * @param keys - * @return - */ - public Operator seek(String ...keys) { - return new Operator(this, keys); - } - - - - /** - * 删除 key的值. 类型map操作. keys是一个路径 remove("a","b") --> {"a": {"b": 1}} 删除b - * @param keys - * @return - */ - public Object remove(String ...keys) { - var cur = data; - for(var i = 0; i < keys.length - 1;i++ ) { - var key = keys[i]; - cur = (Map) cur.get(key); - } - return cur.remove(keys[keys.length - 1]); - } - - /** - * 更新配置 - * @return 返回是否发布成功. - * @throws NacosException - */ - public Boolean update() throws NacosException { - return configService.publishConfig(dataId, group,new Yaml().dumpAsMap(data)); - } - - public static String ENV_TEST = ""; - - /** - * 统一使用配置的入口函数 线程安全 - * @param GroupAndDataID 使用配置的标签 eg."group.dataId" - * @param execute 匿名函数 - * @return 函数返回的值, 如果不需要直接返回null - * @throws Exception - */ - public static Object UseConfig(String GroupAndDataID, Function execute) throws Exception { - - Config cnf; - synchronized(configDict) { - cnf = configDict.get(GroupAndDataID); - if(cnf == null) { - cnf = new Config(GroupAndDataID); - configDict.put(GroupAndDataID, cnf); - } - } - - try { - cnf.datalock.lock(); - var res = execute.apply(cnf); - return res; - } catch (Exception e) { - throw e; - } finally { - cnf.datalock.unlock(); - } - } - - /** - * 统一使用配置的入口函数 线程安全 - * @param execute 统一使用配置的入口匿名方法 - * @return 函数返回的值, 如果不需要直接返回null - * @throws Exception - */ - public static Object UseConfig(Function execute) throws Exception { - return UseConfig(DEFAULT_GROUP_DATAID, execute); - } -} diff --git a/src/main/java/com/yuandian/dataflow/grpc/CollectPackets.java b/src/main/java/com/yuandian/dataflow/grpc/CollectPackets.java index 48bd3ea..99191d7 100644 --- a/src/main/java/com/yuandian/dataflow/grpc/CollectPackets.java +++ b/src/main/java/com/yuandian/dataflow/grpc/CollectPackets.java @@ -11,6 +11,7 @@ import java.time.Instant; import com.google.protobuf.Any; import com.google.protobuf.InvalidProtocolBufferException; +import com.yuandian.common.Config; import com.yuandian.dataflow.proto.CollectPacketsServerGrpc; @@ -37,7 +38,15 @@ import lombok.extern.slf4j.Slf4j; public class CollectPackets extends CollectPacketsServerImplBase { - public static void main(String[] args) throws InvalidProtocolBufferException { + public static void main(String[] args) throws Exception { + + Config.UseConfig( (cnf) -> { + var tid = cnf.get("test_id"); + log.info("{}",tid); + return null; + }); + + // private final ManagedChannelBuilder managedChannelBuilder; // private final CollectPacketsServerStub blockingStub; diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index f20b366..45b99ac 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -1,6 +1,6 @@ server.port=3440 -yuandian.dataflow.config.nacos.server.addr="192.168.1.113:8848" -yuandian.dataflow.config.nacos.dataid=dataflow -yuandian.dataflow.config.nacos.group=yuandian + +yuandian.dataflow.config.nacos.server.addr=192.168.1.113:8848 # 不能 "为结尾" + \ No newline at end of file diff --git a/src/test/java/com/yuandian/dataflow/config/ConfigTest.java b/src/test/java/com/yuandian/dataflow/config/ConfigTest.java deleted file mode 100644 index f7df421..0000000 --- a/src/test/java/com/yuandian/dataflow/config/ConfigTest.java +++ /dev/null @@ -1,111 +0,0 @@ -package com.yuandian.dataflow.config; - -import java.io.IOException; -import java.time.Instant; - -import org.junit.Assert; -import org.junit.FixMethodOrder; -import org.junit.experimental.theories.suppliers.TestedOn; -import org.junit.jupiter.api.Order; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.TestMethodOrder; - -import com.alibaba.nacos.api.exception.NacosException; - -import lombok.extern.slf4j.Slf4j; - -@Slf4j -public class ConfigTest { - - - /** - * 测试配置基础用法 - * @throws Exception - */ - @Test - @Order(1) - void testUseConfig() throws Exception { - Config.ENV_TEST = "-test"; - - Config.UseConfig((cnf) -> { - log.info("{}",cnf.data); - Assert.assertEquals(cnf.get("key1", "key2"), "key_path"); - Instant now = Instant.now(); - cnf.data.put("use_config", now.toString()); - try { - log.info("{}",cnf.update()); - } catch (NacosException e) { - e.printStackTrace(); - } - return null; - }); - } - - @Test - @Order(2) - void testRemove() throws Exception { - Config.ENV_TEST = "-test"; - - Config.UseConfig((cnf) -> { - cnf.remove("create1", "create2"); - try { - log.info("{}",cnf.update()); // 所有增加删除操作要最后同步到nacos. 都需要update - } catch (NacosException e) { - e.printStackTrace(); - } - - cnf.remove("create1"); - try { - log.info("{}",cnf.update()); // 所有增加删除操作要最后同步到nacos. 都需要update - } catch (NacosException e) { - e.printStackTrace(); - } - return null; - }); - } - - - @Test - @Order(1) - void testCreateKeys() throws Exception { - Config.ENV_TEST = "-test"; - - Config.UseConfig((cnf) -> { - log.info("{}",cnf.data); - cnf.seek("create1", "create2", "create3").createKeys().set(Instant.now().toString());; - try { - log.info("{}",cnf.update()); // 所有增加删除操作要最后同步到nacos. 都需要update - } catch (NacosException e) { - e.printStackTrace(); - } - return null; - }); - } - - - - @Test - void testUpdate() throws Exception { - Config.ENV_TEST = "-test"; - - Config.UseConfig((cnf) -> { - Instant now = Instant.now(); - cnf.data.put("use_config", now.toString()); - try { - log.info("{}",cnf.update()); - } catch (NacosException e) { - e.printStackTrace(); - } - return null; - }); - } - - @Test - void testLabelConfig() throws Exception { - Config.UseConfig("org.fortest", (cnf)->{ - log.info("{}", cnf.get("test")); - Assert.assertEquals(cnf.get("test"), "groupAndDataId"); - return null; - }); - } -}