基本完成需要的功能 TODO: 单元测试

This commit is contained in:
huangsimin 2019-07-09 14:38:51 +08:00
parent 08b920ed19
commit 64e48bedfe
3 changed files with 45 additions and 60 deletions

View File

@ -1,13 +1,9 @@
package cn.ecpark.service.usergw.biz.filters.factory; package cn.ecpark.service.usergw.biz.filters.factory;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
@ -15,25 +11,14 @@ import org.apache.dubbo.rpc.service.GenericService;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.gateway.filter.GatewayFilter; import org.springframework.cloud.gateway.filter.GatewayFilter;
import org.springframework.cloud.gateway.filter.factory.AbstractGatewayFilterFactory; import org.springframework.cloud.gateway.filter.factory.AbstractGatewayFilterFactory;
import org.springframework.cloud.gateway.support.DefaultServerRequest;
import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContext;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.HttpHeaders; import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.codec.HttpMessageReader;
import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse; import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.MultiValueMap;
import org.springframework.web.reactive.function.BodyExtractor;
import org.springframework.web.reactive.function.BodyInserter;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.server.ServerRequest;
import cn.ecpark.service.usergw.biz.filters.bean.GenericServicePool; import cn.ecpark.service.usergw.biz.filters.bean.GenericServicePool;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;; import reactor.core.publisher.Mono;;
@Component @Component
@ -61,68 +46,63 @@ public class DubboGatewayFilterFactory extends AbstractGatewayFilterFactory<Dubb
@Override @Override
public GatewayFilter apply(Config config) { public GatewayFilter apply(Config config) {
String uri = config.dubboUri; String uri = config.dubboUri;
return (exchange, chain) -> { return (exchange, chain) -> {
GenericServicePool gsPool = appContext.getBean(GenericServicePool.class);
GenericService gs = gsPool.get(uri);
// ServerRequest serverRequest = new DefaultServerRequest(exchange);
// Mono<String> modifiedBody = serverRequest.bodyToMono(String.class).map(s -> {
// log.info("original text:{}", s);
// /**
// * do your decrypt() here and convert to String.class or you can change the
// * output class to change format
// */
// return "12 " + s;
// });
// modifiedBody.subscribe(databuferr -> {
// log.info(databuferr);
// });
ServerHttpRequest req = exchange.getRequest(); ServerHttpRequest req = exchange.getRequest();
HttpHeaders headers = req.getHeaders(); HttpHeaders headers = req.getHeaders();
List<String> methodString = headers.get("method"); List<String> methodString = headers.get("method");
List<String> params = headers.get("params"); List<String> params = headers.get("params");
if(methodString.size() != 0) { Object result = null;
// special ServerHttpResponse response = exchange.getResponse();
List<String> paramTypes = mehtods.get(methodString.get(0));
if(paramTypes != null) {
if(methodString.size() != 0) {
List<String> paramTypes;
//判断全部函数允许, 必须带参数类型, 而且要匹配, 否则报错
if(uri.charAt(5) == '-') {
paramTypes = headers.get("param-types");
} else {
paramTypes = mehtods.get(methodString.get(0));
}
if(paramTypes != null) {
int paramsSize = 0; int paramsSize = 0;
if(params != null) { if(params != null) {
paramsSize = params.size(); paramsSize = params.size();
} }
if(paramTypes.size() == paramsSize) { if(paramTypes.size() == paramsSize) {
Object result = null;
GenericServicePool gsPool = appContext.getBean(GenericServicePool.class);
GenericService gs = gsPool.get(uri);
if(paramsSize == 0) { if(paramsSize == 0) {
result = gs.$invoke(methodString.get(0), new String[]{}, new Object[]{}); result = gs.$invoke(methodString.get(0), new String[]{}, new Object[]{});
} else { } else {
result = gs.$invoke(methodString.get(0), Arrays.copyOf(paramTypes.toArray(), paramTypes.size(), String[].class), params.toArray()); result = gs.$invoke(methodString.get(0), Arrays.copyOf(paramTypes.toArray(), paramTypes.size(), String[].class), params.toArray());
} }
if(result == null) {
ServerHttpResponse response = exchange.getResponse(); return response.setComplete();
if (result != null) { }
return response.writeWith(Mono
.just(response.bufferFactory().wrap(ByteBuffer.wrap(JSON.toJSONString(result).getBytes()))));
}
return response.setComplete();
}else { }else {
log.warn("paramTypes.size {} is not equals to params size {}", paramTypes.size(), params.size()); result = String.format("paramTypes.size %d is not equals to params size %d",
} paramTypes.size(),paramsSize);
log.warn((String)result);
}
} else { } else {
log.warn("mehtod: {} is not exist or not allowed", methodString.get(0)); result = String.format("mehtod: %s, param-types: null is not exist or not allowed", methodString.get(0));
log.warn((String)result);
} }
} else { } else {
log.info("queryParams.get(\"method\") is null"); result = String.format("queryParams.get(\"method\") is null");
log.warn((String)result);
} }
if (result != null) {
return response.writeWith(Mono
.just(response.bufferFactory().wrap(ByteBuffer.wrap(JSON.toJSONString(result).getBytes()))));
}
return chain.filter(exchange); return chain.filter(exchange);
}; };

View File

@ -243,6 +243,14 @@ public class ConfigGateway implements RouteDefinitionLocator {
reference.setConnections(3); reference.setConnections(3);
String UriString = "dubbo://"; String UriString = "dubbo://";
Object methods = iter.get("methods");
if(methods == null || ((List<Object>)methods).size() == 0) {
UriString = "dubbo-allowed://";
} else {
UriString = "dubbo://";
}
Object application = iter.get("application"); Object application = iter.get("application");
if (application != null) { if (application != null) {
BiConsumer<ReferenceConfig<GenericService>, Object> doFunc = specialField.get("application"); BiConsumer<ReferenceConfig<GenericService>, Object> doFunc = specialField.get("application");
@ -277,10 +285,7 @@ public class ConfigGateway implements RouteDefinitionLocator {
} }
} }
// Object methods = iter.get("methods");
// if(methods == null || ((List<Object>)methods).size() == 0) {
// DubboGatewayFilterFactory.mehtods.put(name, params);
// }
UriString += Extract.getReferenceConfigKey(reference); UriString += Extract.getReferenceConfigKey(reference);
GenericServicePool gsPool = appContext.getBean(GenericServicePool.class); GenericServicePool gsPool = appContext.getBean(GenericServicePool.class);

View File

@ -33,12 +33,12 @@ dubbo:
- id: test - id: test
order: 0 order: 0
application: dubbo-exchange application: dubbo-exchange
methods: # 如果没填就从 request拿 意味着所有接口都可以使用 # methods: # 如果没填就从 request拿 意味着所有接口都可以使用
- name: Say # - name: Say
param-types: # param-types:
- java.lang.String # - java.lang.String
- name: Hello # - name: Hello
connections: 4 connections: 4
group: test group: test