TODO: finish

This commit is contained in:
eson 2022-06-15 18:31:05 +08:00
parent b7a5ea5570
commit 2de2796819
3 changed files with 315 additions and 176 deletions

View File

@ -191,7 +191,6 @@
</goals> </goals>
</execution> </execution>
</executions> </executions>
</plugin> </plugin>
</plugins> </plugins>
</build> </build>

View File

@ -9,23 +9,16 @@ package com.yuandian.dataflow.config;
import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.PrintWriter; import java.util.HashMap;
import java.io.StringBufferInputStream; import java.util.LinkedHashMap;
import java.io.StringReader;
import java.io.StringWriter;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function; import java.util.function.Function;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.system.SystemProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.stereotype.Component;
import org.yaml.snakeyaml.Yaml; import org.yaml.snakeyaml.Yaml;
import com.alibaba.nacos.api.NacosFactory; import com.alibaba.nacos.api.NacosFactory;
@ -33,14 +26,13 @@ import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.config.ConfigService; import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.listener.Listener; import com.alibaba.nacos.api.config.listener.Listener;
import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.api.exception.NacosException;
import com.google.api.client.http.InputStreamContent;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
/** /**
* description * description nacos配置.
* *
* @author eson * @author eson
*2022年6月13日-17:08:46 *2022年6月13日-17:08:46
@ -50,30 +42,60 @@ import lombok.extern.slf4j.Slf4j;
@Setter @Setter
public class Config { public class Config {
public static String DEFAULT_CONFIT_FILE = "application.properties";
public static String DEFAULT_CONFIG_ADDR = "yuandian.dataflow.config.nacos.server.addr";
// public static String DEFAULT_CONFIG_DATAID = "yuandian.dataflow.config.nacos.dataid";
// public static String DEFAULT_CONFIG_GROUP = "yuandian.dataflow.config.nacos.group";+
public Map<String,Object> configMap; // 默认
public static String DEFAULT_GROUP_DATAID = "yuandian.dataflow";
// 所有生成的nacos客户端
private static HashMap<String,Config> configDict = new HashMap<>();
@Value("${yuandian.dataflow.config.nacos.server.addr}") // 配置的所有值主类
public Map<String,Object> data;
// nacos地址
public String serverAddr ; public String serverAddr ;
// nacos dataId
public String dataId ; public String dataId ;
// nacos group
public String group ; public String group ;
// 线程安全配置锁
private Lock datalock;
private static Config config;
// nacos 客户端类
private ConfigService configService; private ConfigService configService;
private Config() throws NacosException, IOException { private Config(String GroupAndDataId) throws Exception {
String[] gad = GroupAndDataId.split("\\.");
if(gad.length != 2) {
throw new Exception("Group 或者 DataId 不能存在 '.' 的命令");
}
this.group = gad[0] + ENV_TEST;
this.dataId = gad[1] + ENV_TEST;
connect();
}
/**
* 连接nacos
* @throws IOException
* @throws NacosException
*/
private void connect() throws IOException, NacosException {
if(configService != null) {
configService.shutDown();
configService = null;
}
// 获取 app
Properties prop = new Properties(); Properties prop = new Properties();
prop.load(Config.class.getClassLoader().getResourceAsStream(DEFAULT_CONFIT_FILE));
serverAddr = prop.getProperty(DEFAULT_CONFIG_ADDR + ENV_TEST).trim();
// dataId = prop.getProperty(DEFAULT_CONFIG_DATAID).trim();
serverAddr = prop.getProperty("yuandian.dataflow.config.nacos.server.addr"); // group = prop.getProperty(DEFAULT_CONFIG_GROUP).trim();
dataId = prop.getProperty("yuandian.dataflow.config.nacos.dataid") ;
group = prop.getProperty("yuandian.dataflow.config.nacos.group");
Properties properties = new Properties(); Properties properties = new Properties();
properties.put(PropertyKeyConst.SERVER_ADDR, serverAddr); properties.put(PropertyKeyConst.SERVER_ADDR, serverAddr);
@ -81,16 +103,21 @@ public class Config {
String content = configService.getConfig(dataId, group, 5000); String content = configService.getConfig(dataId, group, 5000);
Yaml yaml = new Yaml(); Yaml yaml = new Yaml();
configMap = yaml.load(content); data = yaml.load(content);
log.info(content); log.info(content);
datalock = new ReentrantLock();
// 监听 配置更新事件
configService.addListener(dataId, group, new Listener() { configService.addListener(dataId, group, new Listener() {
@Override @Override
public void receiveConfigInfo(String configInfo) { public void receiveConfigInfo(String configInfo) {
System.out.println("recieve:" + configInfo); log.debug("recieve:" + configInfo);
synchronized(configMap){ try {
configMap = (Map<String, Object>)new Yaml().load(configInfo); datalock.lock();
System.out.println(configMap); data = (Map<String, Object>)new Yaml().load(configInfo);
log.debug("{}",data);
} finally {
datalock.unlock();
} }
} }
@ -100,20 +127,95 @@ public class Config {
} }
}); });
// boolean isPublishOk = configService.publishConfig(dataId, group, Prop.toString());
// System.out.println(isPublishOk);
// Thread.sleep(3000); }
// content = configService.getConfig(dataId, group, 5000);
// System.out.println(content);
// boolean isRemoveOk = configService.removeConfig(dataId, group);
// System.out.println(isRemoveOk);
// Thread.sleep(3000);
// content = configService.getConfig(dataId, group, 5000); /**
// System.out.println(content); * 根据多个key获取yaml的值 keys 路径
// Thread.sleep(300000); * @param keys 获取的key值
* @return
*/
public Object get(String ...keys) {
var cur = data;
for(var i = 0; i < keys.length - 1;i++ ) {
var key = keys[i];
cur = (Map<String, Object>) cur.get(key);
}
return cur.get(keys[keys.length - 1]);
}
/**
*
* 用于定位keys的路径后的操作. 创建keys后赋值. 如果存在keys, 可以直接赋值, 不存在则直接报错
*
* @author eson
*2022年6月15日-下午12:06:37
*/
public class Operator {
Config config;
String[] keys;
Operator(Config config, String[] keys) {
this.config = config;
this.keys = keys;
}
/**
* 创建seek的key
* @return
*/
public Operator createKeys() {
var cur = config.data;
for(var i = 0; i < keys.length;i++ ) {
var key = keys[i];
var vobj = cur.get(key);
if (vobj == null) {
vobj = new LinkedHashMap<>();
cur.put(key, vobj);
}
cur = (Map<String, Object>) vobj;
}
return this;
}
/**
* 定位后赋值
* @param value
*/
public void set(Object value) {
var cur = config.data;
for(var i = 0; i < keys.length - 1;i++ ) {
var key = keys[i];
cur = (Map<String, Object>) cur.get(key);
}
cur.put(keys[keys.length - 1], value);
}
}
/**
* 定位. eg. seek("key1", "key2")
* @param keys
* @return
*/
public Operator seek(String ...keys) {
return new Operator(this, keys);
}
/**
* 删除 key的值. 类型map操作. keys是一个路径 remove("a","b") --> {"a": {"b": 1}} 删除b
* @param keys
* @return
*/
public Object remove(String ...keys) {
var cur = data;
for(var i = 0; i < keys.length - 1;i++ ) {
var key = keys[i];
cur = (Map<String, Object>) cur.get(key);
}
return cur.remove(keys[keys.length - 1]);
} }
/** /**
@ -122,120 +224,47 @@ public class Config {
* @throws NacosException * @throws NacosException
*/ */
public Boolean update() throws NacosException { public Boolean update() throws NacosException {
return configService.publishConfig(dataId, group,new Yaml().dumpAsMap(configMap)); return configService.publishConfig(dataId, group,new Yaml().dumpAsMap(data));
} }
public static String ENV_TEST = "";
/** /**
* * 统一使用配置的入口函数 线程安全
* @param execute 统一使用配置的入口 * @param GroupAndDataID 使用配置的标签 eg."group.dataId"
* @param execute 匿名函数
* @return 函数返回的值, 如果不需要直接返回null * @return 函数返回的值, 如果不需要直接返回null
* @throws NacosException * @throws Exception
* @throws IOException
*/ */
public static Object UseConfig(Function<Config, Object> execute) throws NacosException, IOException { public static Object UseConfig(String GroupAndDataID, Function<Config, Object> execute) throws Exception {
if(config == null) {
config = new Config(); Config cnf;
} synchronized(configDict) {
synchronized(config) { cnf = configDict.get(GroupAndDataID);
return execute.apply(config); if(cnf == null) {
cnf = new Config(GroupAndDataID);
configDict.put(GroupAndDataID, cnf);
} }
} }
// public Properties Prop ;
// private static Config cnf ;
// public Config(Properties prop) {
// Prop = prop;
// }
// public static Config GetConfig() throws NacosException, IOException {
// if(cnf == null) {
// cnf = new Config();
// }
// return cnf;
// }
public static void main(String[] args) throws NacosException, InterruptedException, IOException {
// String serverAddr = "192.168.1.113:8848";
// String dataId = "dataflow";
// String group = "yuandian";
// Properties properties = new Properties();
// properties.put(PropertyKeyConst.SERVER_ADDR, serverAddr);
// ConfigService configService = NacosFactory.createConfigService(properties);
// String content = configService.getConfig(dataId, group, 5000);
// Yaml yaml = new Yaml();
// cnf = yaml.load(content);
// log.info(content);
// // Properties cnf = new Properties();
// // cnf.load(new StringReader(content));
// System.out.println(cnf);
// configService.addListener(dataId, group, new Listener() {
// @Override
// public void receiveConfigInfo(String configInfo) {
// System.out.println("recieve:" + configInfo);
// synchronized(cnf){
// cnf = (Map<String, Object>)new Yaml().load(configInfo);
// }
// System.out.println(cnf);
// }
// @Override
// public Executor getExecutor() {
// return null;
// }
// });
// cnf.put("publish", "123423112332");
// log.info(yaml.dumpAsMap(cnf));
Config.UseConfig((cnf) -> {
log.info("{}",cnf.configMap);
cnf.configMap.put("use_config", "12");
try { try {
log.info("{}",cnf.update()); cnf.datalock.lock();
} catch (NacosException e) { var res = execute.apply(cnf);
e.printStackTrace(); return res;
} catch (Exception e) {
throw e;
} finally {
cnf.datalock.unlock();
}
} }
return null;
});
/**
// boolean isPublishOk = configService.publishConfig(dataId, group, yaml.dumpAsMap(cnf)); * 统一使用配置的入口函数 线程安全
// System.out.println(isPublishOk); * @param execute 统一使用配置的入口匿名方法
* @return 函数返回的值, 如果不需要直接返回null
Thread.sleep(300000); * @throws Exception
*/
// boolean isPublishOk = configService.publishConfig(dataId, group, "content"); public static Object UseConfig(Function<Config, Object> execute) throws Exception {
// System.out.println(isPublishOk); return UseConfig(DEFAULT_GROUP_DATAID, execute);
}
// Thread.sleep(3000);
// content = configService.getConfig(dataId, group, 5000);
// System.out.println(content);
// boolean isRemoveOk = configService.removeConfig(dataId, group);
// System.out.println(isRemoveOk);
// Thread.sleep(3000);
// content = configService.getConfig(dataId, group, 5000);
// System.out.println(content);
// Thread.sleep(300000);
};
} }

View File

@ -0,0 +1,111 @@
package com.yuandian.dataflow.config;
import java.io.IOException;
import java.time.Instant;
import org.junit.Assert;
import org.junit.FixMethodOrder;
import org.junit.experimental.theories.suppliers.TestedOn;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
import com.alibaba.nacos.api.exception.NacosException;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class ConfigTest {
/**
* 测试配置基础用法
* @throws Exception
*/
@Test
@Order(1)
void testUseConfig() throws Exception {
Config.ENV_TEST = "-test";
Config.UseConfig((cnf) -> {
log.info("{}",cnf.data);
Assert.assertEquals(cnf.get("key1", "key2"), "key_path");
Instant now = Instant.now();
cnf.data.put("use_config", now.toString());
try {
log.info("{}",cnf.update());
} catch (NacosException e) {
e.printStackTrace();
}
return null;
});
}
@Test
@Order(2)
void testRemove() throws Exception {
Config.ENV_TEST = "-test";
Config.UseConfig((cnf) -> {
cnf.remove("create1", "create2");
try {
log.info("{}",cnf.update()); // 所有增加删除操作要最后同步到nacos. 都需要update
} catch (NacosException e) {
e.printStackTrace();
}
cnf.remove("create1");
try {
log.info("{}",cnf.update()); // 所有增加删除操作要最后同步到nacos. 都需要update
} catch (NacosException e) {
e.printStackTrace();
}
return null;
});
}
@Test
@Order(1)
void testCreateKeys() throws Exception {
Config.ENV_TEST = "-test";
Config.UseConfig((cnf) -> {
log.info("{}",cnf.data);
cnf.seek("create1", "create2", "create3").createKeys().set(Instant.now().toString());;
try {
log.info("{}",cnf.update()); // 所有增加删除操作要最后同步到nacos. 都需要update
} catch (NacosException e) {
e.printStackTrace();
}
return null;
});
}
@Test
void testUpdate() throws Exception {
Config.ENV_TEST = "-test";
Config.UseConfig((cnf) -> {
Instant now = Instant.now();
cnf.data.put("use_config", now.toString());
try {
log.info("{}",cnf.update());
} catch (NacosException e) {
e.printStackTrace();
}
return null;
});
}
@Test
void testLabelConfig() throws Exception {
Config.UseConfig("org.fortest", (cnf)->{
log.info("{}", cnf.get("test"));
Assert.assertEquals(cnf.get("test"), "groupAndDataId");
return null;
});
}
}