Merge branch 'develop' of gitee.com:fusenpack/fusenapi into develop
This commit is contained in:
commit
ddb4ec944d
@ -18,7 +18,6 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/gorilla/websocket"
|
"github.com/gorilla/websocket"
|
||||||
"github.com/zeromicro/go-zero/core/logx"
|
|
||||||
"gopkg.in/yaml.v2"
|
"gopkg.in/yaml.v2"
|
||||||
|
|
||||||
_ "fusenapi/utils/auth"
|
_ "fusenapi/utils/auth"
|
||||||
@ -105,16 +104,8 @@ func main() {
|
|||||||
|
|
||||||
// 对/api开头的请求进行反向代理
|
// 对/api开头的请求进行反向代理
|
||||||
proxy := httputil.NewSingleHostReverseProxy(apiURL)
|
proxy := httputil.NewSingleHostReverseProxy(apiURL)
|
||||||
proxy.ErrorHandler = func(res http.ResponseWriter, req *http.Request, err error) {
|
|
||||||
if err != nil {
|
|
||||||
|
|
||||||
// 在发生错误时进行处理
|
|
||||||
logx.Error(err)
|
|
||||||
logx.Error(res.Header())
|
|
||||||
res.WriteHeader(http.StatusNotFound) // 返回404状态码
|
|
||||||
}
|
|
||||||
}
|
|
||||||
proxy.ServeHTTP(w, r)
|
proxy.ServeHTTP(w, r)
|
||||||
|
|
||||||
return
|
return
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
@ -218,22 +209,22 @@ func NewBackend(mux *http.ServeMux, httpAddress string, muxPaths ...string) *Bac
|
|||||||
target := url.URL{Scheme: "ws", Host: strings.Split(backend.HttpAddress, "//")[1], Path: r.URL.Path}
|
target := url.URL{Scheme: "ws", Host: strings.Split(backend.HttpAddress, "//")[1], Path: r.URL.Path}
|
||||||
|
|
||||||
var transfer = func(src, dest *websocket.Conn) {
|
var transfer = func(src, dest *websocket.Conn) {
|
||||||
|
defer src.Close()
|
||||||
|
defer dest.Close()
|
||||||
|
// TODO: 可以做错误处理
|
||||||
for {
|
for {
|
||||||
mType, msg, err := src.ReadMessage()
|
mType, msg, err := src.ReadMessage()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Println(err)
|
log.Println(err)
|
||||||
break
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
err = dest.WriteMessage(mType, msg)
|
err = dest.WriteMessage(mType, msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Println(err)
|
log.Println(err)
|
||||||
break
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
src.Close()
|
|
||||||
dest.Close()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
header := r.Header.Clone()
|
header := r.Header.Clone()
|
||||||
@ -257,6 +248,7 @@ func NewBackend(mux *http.ServeMux, httpAddress string, muxPaths ...string) *Bac
|
|||||||
}
|
}
|
||||||
conn, err := upgrader.Upgrade(w, r, nil)
|
conn, err := upgrader.Upgrade(w, r, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// defer conn.Close()
|
// defer conn.Close()
|
||||||
@ -269,14 +261,14 @@ func NewBackend(mux *http.ServeMux, httpAddress string, muxPaths ...string) *Bac
|
|||||||
// 解析目标URL,包含了查询参数
|
// 解析目标URL,包含了查询参数
|
||||||
targetURL, err := url.Parse(httpAddress + r.URL.String())
|
targetURL, err := url.Parse(httpAddress + r.URL.String())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
http.Error(w, "Error parsing target URL", http.StatusInternalServerError)
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// 创建新的请求
|
// 创建新的请求
|
||||||
proxyReq, err := http.NewRequest(r.Method, targetURL.String(), r.Body)
|
proxyReq, err := http.NewRequest(r.Method, targetURL.String(), r.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
http.Error(w, "Error creating proxy request", http.StatusInternalServerError)
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -294,7 +286,7 @@ func NewBackend(mux *http.ServeMux, httpAddress string, muxPaths ...string) *Bac
|
|||||||
// 发送请求
|
// 发送请求
|
||||||
resp, err := backend.Client.Do(proxyReq)
|
resp, err := backend.Client.Do(proxyReq)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
http.Error(w, "Error sending proxy request", http.StatusInternalServerError)
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
@ -310,7 +302,7 @@ func NewBackend(mux *http.ServeMux, httpAddress string, muxPaths ...string) *Bac
|
|||||||
w.WriteHeader(resp.StatusCode)
|
w.WriteHeader(resp.StatusCode)
|
||||||
_, err = io.Copy(w, resp.Body)
|
_, err = io.Copy(w, resp.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
http.Error(w, "Error copying proxy response", http.StatusInternalServerError)
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -4,57 +4,11 @@ single_server_name=$1
|
|||||||
|
|
||||||
go mod tidy
|
go mod tidy
|
||||||
go mod vendor
|
go mod vendor
|
||||||
|
|
||||||
# 定义一个函数来在每个服务器目录下运行 go run <server_name>.go
|
|
||||||
run_server() {
|
|
||||||
server_name=$1
|
|
||||||
echo "Running $server_name"
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# 如果之前存在相同名字的 screen 会话,先将其终止
|
|
||||||
existing_session=$(screen -ls | grep -w "$server_name")
|
|
||||||
if [ -n "$existing_session" ]; then
|
|
||||||
echo "Terminating existing screen session for $server_name"
|
|
||||||
screen -S "$server_name" -X quit
|
|
||||||
fi
|
|
||||||
# 导航到相应的目录
|
|
||||||
cd server/$server_name
|
|
||||||
go build
|
|
||||||
|
|
||||||
[ -f .gitignore ] || echo $server_name > .gitignore
|
|
||||||
# 使用 screen 运行 go run <server_name>.go
|
|
||||||
screen -dmS $server_name -L ./$server_name
|
|
||||||
|
|
||||||
# 返回到上一级目录
|
|
||||||
cd - > /dev/null
|
|
||||||
}
|
|
||||||
|
|
||||||
find /tmp/go-build* -mmin +5 -exec rm -rf {} +
|
find /tmp/go-build* -mmin +5 -exec rm -rf {} +
|
||||||
find /tmp/go-link* -mmin +5 -exec rm -rf {} +
|
find /tmp/go-link* -mmin +5 -exec rm -rf {} +
|
||||||
|
|
||||||
server_dirs=() # 初始化一个空数组
|
run_proxyserver() {
|
||||||
|
# 定义目录和screen名称
|
||||||
if [ -n "$single_server_name" ]; then
|
|
||||||
server_dirs=("$single_server_name")
|
|
||||||
else
|
|
||||||
for dir in server/*/ ; do # 遍历 "server/" 下的所有子目录
|
|
||||||
dir=${dir%*/} # 删除末尾的 "/"
|
|
||||||
dir=${dir##*/} # 删除开头的 "server/"
|
|
||||||
server_dirs+=("$dir") # 添加到数组
|
|
||||||
done
|
|
||||||
fi
|
|
||||||
|
|
||||||
# 在每个服务器目录下运行相应的 go 程序
|
|
||||||
for server_dir in "${server_dirs[@]}"; do
|
|
||||||
run_server $server_dir
|
|
||||||
done
|
|
||||||
|
|
||||||
|
|
||||||
if [ -n "$single_server_name" ]; then
|
|
||||||
echo "no proxyserver restart"
|
|
||||||
else
|
|
||||||
# 定义目录和screen名称
|
|
||||||
dir_path="./proxyserver"
|
dir_path="./proxyserver"
|
||||||
screen_name="proxyserver"
|
screen_name="proxyserver"
|
||||||
|
|
||||||
@ -67,6 +21,77 @@ else
|
|||||||
fi
|
fi
|
||||||
go build
|
go build
|
||||||
# 启动新的screen session并运行go程序
|
# 启动新的screen session并运行go程序
|
||||||
|
echo "run $screen_name"
|
||||||
screen -dmS $screen_name -L ./$screen_name
|
screen -dmS $screen_name -L ./$screen_name
|
||||||
|
}
|
||||||
|
|
||||||
|
# 定义一个函数来在每个服务器目录下运行 go run <server_name>.go
|
||||||
|
run_server() {
|
||||||
|
server_name=$1
|
||||||
|
|
||||||
|
# 导航到相应的目录
|
||||||
|
cd server/$server_name
|
||||||
|
go build
|
||||||
|
echo "build $server_name"
|
||||||
|
|
||||||
|
# 如果之前存在相同名字的 screen 会话,先将其终止
|
||||||
|
# 首先尝试关闭已存在的screen会话
|
||||||
|
existing_session=$(screen -ls | grep -w "$server_name")
|
||||||
|
if [ -n "$existing_session" ]; then
|
||||||
|
echo "Terminating existing screen session for $server_name"
|
||||||
|
screen -S "$server_name" -X quit
|
||||||
|
while [[ $(screen -ls | grep "\.$server_name\s") ]]; do
|
||||||
|
sleep 0.1s # 等待0.1秒后再次检查
|
||||||
|
echo "wait for $server_name"
|
||||||
|
done
|
||||||
|
fi
|
||||||
|
|
||||||
|
# 循环检查screen进程是否存在
|
||||||
|
|
||||||
|
[ -f .gitignore ] || echo $server_name > .gitignore
|
||||||
|
# 使用 screen 运行 go run <server_name>.go
|
||||||
|
|
||||||
|
echo "Running $server_name"
|
||||||
|
screen -dmS $server_name -L ./$server_name
|
||||||
|
|
||||||
|
# 返回到上一级目录
|
||||||
|
cd - > /dev/null
|
||||||
|
}
|
||||||
|
|
||||||
|
if [ "$single_server_name" = "proxyserver" ]; then
|
||||||
|
# 重启proxyserver的逻辑
|
||||||
|
run_proxyserver
|
||||||
|
else
|
||||||
|
|
||||||
|
server_dirs=() # 初始化一个空数组
|
||||||
|
|
||||||
|
if [ -n "$single_server_name" ]; then
|
||||||
|
server_dirs=("$single_server_name")
|
||||||
|
else
|
||||||
|
for dir in server/*/ ; do # 遍历 "server/" 下的所有子目录
|
||||||
|
dir=${dir%*/} # 删除末尾的 "/"
|
||||||
|
dir=${dir##*/} # 删除开头的 "server/"
|
||||||
|
server_dirs+=("$dir") # 添加到数组
|
||||||
|
done
|
||||||
|
fi
|
||||||
|
|
||||||
|
# 在每个服务器目录下运行相应的 go 程序
|
||||||
|
for server_dir in "${server_dirs[@]}"; do
|
||||||
|
run_server $server_dir
|
||||||
|
done
|
||||||
|
|
||||||
|
if [ -n "$single_server_name" ]; then
|
||||||
|
echo "no proxyserver restart"
|
||||||
|
else
|
||||||
|
run_proxyserver
|
||||||
|
fi
|
||||||
fi
|
fi
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@ -6,6 +6,7 @@ import (
|
|||||||
"fusenapi/utils/auth"
|
"fusenapi/utils/auth"
|
||||||
"fusenapi/utils/basic"
|
"fusenapi/utils/basic"
|
||||||
"fusenapi/utils/check"
|
"fusenapi/utils/check"
|
||||||
|
"log"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"context"
|
"context"
|
||||||
@ -82,6 +83,26 @@ func (mquery *ModuleQuery) EncodeEmpty() map[string]any {
|
|||||||
return qstr
|
return qstr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func QueryDefault(conn *gorm.DB, module string, moduleQuery string, tname string) map[string]any {
|
||||||
|
|
||||||
|
qname := strings.Split(moduleQuery, ".")
|
||||||
|
queryAsName := qname[len(qname)-1]
|
||||||
|
sqlstr := fmt.Sprintf(
|
||||||
|
"select JSON_EXTRACT(metadata,'$.%s') as %s from %s where module = '%s' and user_id = 0 and guest_id = 0 order by ctime DESC limit 1",
|
||||||
|
moduleQuery, // logo_selected
|
||||||
|
queryAsName, // logo_selected
|
||||||
|
tname, // fs_user_info
|
||||||
|
module, // profile
|
||||||
|
)
|
||||||
|
raw := conn.Raw(sqlstr)
|
||||||
|
var info map[string]any
|
||||||
|
err := raw.Scan(&info).Error
|
||||||
|
if err == gorm.ErrRecordNotFound {
|
||||||
|
logx.Error(err)
|
||||||
|
}
|
||||||
|
return info
|
||||||
|
}
|
||||||
|
|
||||||
func (l *InfoLogic) Info(req *types.UserInfoRequest, userinfo *auth.UserInfo) (resp *basic.Response) {
|
func (l *InfoLogic) Info(req *types.UserInfoRequest, userinfo *auth.UserInfo) (resp *basic.Response) {
|
||||||
// 返回值必须调用Set重新返回, resp可以空指针调用 resp.SetStatus(basic.CodeOK, data)
|
// 返回值必须调用Set重新返回, resp可以空指针调用 resp.SetStatus(basic.CodeOK, data)
|
||||||
// userinfo 传入值时, 一定不为null
|
// userinfo 传入值时, 一定不为null
|
||||||
@ -178,6 +199,29 @@ func (l *InfoLogic) Info(req *types.UserInfoRequest, userinfo *auth.UserInfo) (r
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 隐含白板用户逻辑
|
||||||
|
if v, ok := metadict["userinfo.profile"]; ok {
|
||||||
|
|
||||||
|
if v == nil {
|
||||||
|
info := QueryDefault(l.svcCtx.MysqlConn, "profile", "logo_selected", "fs_user_info")
|
||||||
|
log.Println(info)
|
||||||
|
metadict["userinfo.profile"] = info
|
||||||
|
// log.Println(metadict)
|
||||||
|
} else {
|
||||||
|
profileDict := v.(map[string]any)
|
||||||
|
if _, ok := profileDict["logo_selected"]; !ok {
|
||||||
|
info := QueryDefault(l.svcCtx.MysqlConn, "profile", "logo_selected", "fs_user_info")
|
||||||
|
profileDict["logo_selected"] = info["logo_selected"]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
} else if v, ok := metadict["userinfo.profile.logo_selected"]; ok {
|
||||||
|
if v == nil {
|
||||||
|
info := QueryDefault(l.svcCtx.MysqlConn, "profile", "logo_selected", "fs_user_info")
|
||||||
|
metadict["userinfo.profile.logo_selected"] = info
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return resp.SetStatus(basic.CodeOK, metadict)
|
return resp.SetStatus(basic.CodeOK, metadict)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -115,9 +115,7 @@ func TestMain(t *testing.T) {
|
|||||||
if v, ok := metadict["userinfo.profile"]; ok {
|
if v, ok := metadict["userinfo.profile"]; ok {
|
||||||
|
|
||||||
if v == nil {
|
if v == nil {
|
||||||
|
|
||||||
info := QueryDefault(conn, "profile", "logo_selected", "fs_user_info")
|
info := QueryDefault(conn, "profile", "logo_selected", "fs_user_info")
|
||||||
log.Println(info)
|
|
||||||
metadict["userinfo.profile"] = info
|
metadict["userinfo.profile"] = info
|
||||||
// log.Println(metadict)
|
// log.Println(metadict)
|
||||||
} else {
|
} else {
|
||||||
@ -126,19 +124,15 @@ func TestMain(t *testing.T) {
|
|||||||
info := QueryDefault(conn, "profile", "logo_selected", "fs_user_info")
|
info := QueryDefault(conn, "profile", "logo_selected", "fs_user_info")
|
||||||
profileDict["logo_selected"] = info["logo_selected"]
|
profileDict["logo_selected"] = info["logo_selected"]
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
} else if v, ok := metadict["userinfo.profile.logo_selected"]; ok {
|
} else if v, ok := metadict["userinfo.profile.logo_selected"]; ok {
|
||||||
if v == nil {
|
if v == nil {
|
||||||
|
info := QueryDefault(conn, "profile", "logo_selected", "fs_user_info")
|
||||||
|
metadict["userinfo.profile.logo_selected"] = info
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Println(metadict)
|
|
||||||
|
|
||||||
return
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func QueryDefault(conn *gorm.DB, module string, moduleQuery string, tname string) map[string]any {
|
func QueryDefault(conn *gorm.DB, module string, moduleQuery string, tname string) map[string]any {
|
||||||
|
@ -10,6 +10,8 @@ import (
|
|||||||
"fusenapi/server/shopping-cart/internal/types"
|
"fusenapi/server/shopping-cart/internal/types"
|
||||||
"fusenapi/utils/auth"
|
"fusenapi/utils/auth"
|
||||||
"fusenapi/utils/basic"
|
"fusenapi/utils/basic"
|
||||||
|
"fusenapi/utils/file"
|
||||||
|
"fusenapi/utils/hash"
|
||||||
"gorm.io/gorm"
|
"gorm.io/gorm"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -51,6 +53,31 @@ func (l *AddToCartLogic) AddToCart(req *types.AddToCartReq, userinfo *auth.UserI
|
|||||||
if cartCount >= 100 {
|
if cartCount >= 100 {
|
||||||
return resp.SetStatusWithMessage(basic.CodeDbSqlErr, "sorry,the count of your carts can`t greater than 100")
|
return resp.SetStatusWithMessage(basic.CodeDbSqlErr, "sorry,the count of your carts can`t greater than 100")
|
||||||
}
|
}
|
||||||
|
if req.RenderImage != "" {
|
||||||
|
//上传base64文件
|
||||||
|
// 上传文件
|
||||||
|
var upload = file.Upload{
|
||||||
|
Ctx: l.ctx,
|
||||||
|
MysqlConn: l.svcCtx.MysqlConn,
|
||||||
|
AwsSession: l.svcCtx.AwsSession,
|
||||||
|
}
|
||||||
|
uploadRes, err := upload.UploadFileByBase64(&file.UploadBaseReq{
|
||||||
|
Source: "webGl render image",
|
||||||
|
FileHash: hash.JsonHashKey(req.RenderImage),
|
||||||
|
FileData: req.RenderImage,
|
||||||
|
Metadata: "",
|
||||||
|
UploadBucket: 1,
|
||||||
|
ApiType: 2,
|
||||||
|
UserId: userinfo.UserId,
|
||||||
|
GuestId: userinfo.GuestId,
|
||||||
|
FileByte: nil,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
logx.Error(err)
|
||||||
|
return resp.SetStatusWithMessage(basic.CodeFileUploadErr, "failed to upload webGl render image")
|
||||||
|
}
|
||||||
|
req.RenderImage = uploadRes.ResourceUrl
|
||||||
|
}
|
||||||
//获取产品是否存在
|
//获取产品是否存在
|
||||||
productInfo, err := l.svcCtx.AllModels.FsProduct.FindOne(l.ctx, req.ProductId)
|
productInfo, err := l.svcCtx.AllModels.FsProduct.FindOne(l.ctx, req.ProductId)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -16,6 +16,7 @@ type ServiceContext struct {
|
|||||||
AllModels *gmodel.AllModelsGen
|
AllModels *gmodel.AllModelsGen
|
||||||
RabbitMq *initalize.RabbitMqHandle
|
RabbitMq *initalize.RabbitMqHandle
|
||||||
Repositories *initalize.Repositories
|
Repositories *initalize.Repositories
|
||||||
|
AwsSession *session.Session
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewServiceContext(c config.Config) *ServiceContext {
|
func NewServiceContext(c config.Config) *ServiceContext {
|
||||||
@ -24,10 +25,11 @@ func NewServiceContext(c config.Config) *ServiceContext {
|
|||||||
Credentials: credentials.NewStaticCredentials(c.AWS.S3.Credentials.AccessKeyID, c.AWS.S3.Credentials.Secret, c.AWS.S3.Credentials.Token),
|
Credentials: credentials.NewStaticCredentials(c.AWS.S3.Credentials.AccessKeyID, c.AWS.S3.Credentials.Secret, c.AWS.S3.Credentials.Token),
|
||||||
}
|
}
|
||||||
return &ServiceContext{
|
return &ServiceContext{
|
||||||
Config: c,
|
Config: c,
|
||||||
MysqlConn: conn,
|
MysqlConn: conn,
|
||||||
AllModels: gmodel.NewAllModels(initalize.InitMysql(c.SourceMysql)),
|
AllModels: gmodel.NewAllModels(initalize.InitMysql(c.SourceMysql)),
|
||||||
RabbitMq: initalize.InitRabbitMq(c.SourceRabbitMq, nil),
|
RabbitMq: initalize.InitRabbitMq(c.SourceRabbitMq, nil),
|
||||||
|
AwsSession: session.Must(session.NewSession(&config)),
|
||||||
Repositories: initalize.NewAllRepositories(&initalize.NewAllRepositorieData{
|
Repositories: initalize.NewAllRepositories(&initalize.NewAllRepositorieData{
|
||||||
GormDB: conn,
|
GormDB: conn,
|
||||||
BLMServiceUrl: &c.BLMService.Url,
|
BLMServiceUrl: &c.BLMService.Url,
|
||||||
|
@ -67,9 +67,9 @@ var (
|
|||||||
//websocket连接存储
|
//websocket连接存储
|
||||||
mapConnPool = sync.Map{}
|
mapConnPool = sync.Map{}
|
||||||
//每个websocket连接入口缓冲队列长度默认值
|
//每个websocket连接入口缓冲队列长度默认值
|
||||||
websocketInChanLen = 500
|
websocketInChanLen = 1000
|
||||||
//每个websocket连接出口缓冲队列长度默认值
|
//每个websocket连接出口缓冲队列长度默认值
|
||||||
websocketOutChanLen = 500
|
websocketOutChanLen = 1000
|
||||||
//是否开启debug
|
//是否开启debug
|
||||||
openDebug = true
|
openDebug = true
|
||||||
//允许跨域的origin
|
//允许跨域的origin
|
||||||
|
@ -22,6 +22,8 @@ import (
|
|||||||
var (
|
var (
|
||||||
//每个websocket渲染任务缓冲队列长度默认值
|
//每个websocket渲染任务缓冲队列长度默认值
|
||||||
renderChanLen = 500
|
renderChanLen = 500
|
||||||
|
//每个websocket渲染并发数
|
||||||
|
renderChanConcurrency = 100
|
||||||
)
|
)
|
||||||
|
|
||||||
// 渲染处理器
|
// 渲染处理器
|
||||||
@ -30,9 +32,7 @@ type renderProcessor struct {
|
|||||||
|
|
||||||
// 云渲染属性
|
// 云渲染属性
|
||||||
type extendRenderProperty struct {
|
type extendRenderProperty struct {
|
||||||
renderChan chan websocket_data.RenderImageReqMsg //渲染消息入口的缓冲队列
|
renderChan chan websocket_data.RenderImageReqMsg //渲染消息入口的缓冲队列
|
||||||
colorSelectedIndex int //选择颜色索引
|
|
||||||
templateTag string //模板标签
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// 处理分发到这里的数据
|
// 处理分发到这里的数据
|
||||||
@ -48,8 +48,6 @@ func (r *renderProcessor) allocationMessage(w *wsConnectItem, data []byte) {
|
|||||||
case <-w.closeChan: //已经关闭
|
case <-w.closeChan: //已经关闭
|
||||||
return
|
return
|
||||||
case w.extendRenderProperty.renderChan <- renderImageData: //发入到缓冲队列
|
case w.extendRenderProperty.renderChan <- renderImageData: //发入到缓冲队列
|
||||||
w.extendRenderProperty.colorSelectedIndex = renderImageData.RenderData.TemplateTagColor.SelectedColorIndex
|
|
||||||
w.extendRenderProperty.templateTag = renderImageData.RenderData.TemplateTag
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -58,36 +56,35 @@ func (r *renderProcessor) allocationMessage(w *wsConnectItem, data []byte) {
|
|||||||
func (w *wsConnectItem) consumeRenderImageData() {
|
func (w *wsConnectItem) consumeRenderImageData() {
|
||||||
defer func() {
|
defer func() {
|
||||||
if err := recover(); err != nil {
|
if err := recover(); err != nil {
|
||||||
logx.Error("func renderImage err:", err)
|
logx.Error("func consumeRenderImageData err:", err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
//限制并发
|
||||||
|
limitChan := make(chan struct{}, renderChanConcurrency)
|
||||||
|
defer close(limitChan)
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-w.closeChan: //已关闭
|
case <-w.closeChan: //已关闭
|
||||||
return
|
return
|
||||||
case data := <-w.extendRenderProperty.renderChan: //消费数据
|
case data := <-w.extendRenderProperty.renderChan: //消费数据
|
||||||
//属性不同则不发送渲染
|
limitChan <- struct{}{}
|
||||||
if data.RenderData.TemplateTag != w.extendRenderProperty.templateTag {
|
go func(d websocket_data.RenderImageReqMsg) {
|
||||||
continue
|
defer func() {
|
||||||
}
|
if err := recover(); err != nil {
|
||||||
//属性不同则不发送渲染
|
logx.Error("func renderImage err:", err)
|
||||||
if data.RenderData.TemplateTagColor.SelectedColorIndex != w.extendRenderProperty.colorSelectedIndex {
|
}
|
||||||
continue
|
}()
|
||||||
}
|
defer func() {
|
||||||
w.renderImage(data)
|
<-limitChan
|
||||||
|
}()
|
||||||
|
w.renderImage(d)
|
||||||
|
}(data)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 执行渲染任务
|
// 执行渲染任务
|
||||||
func (w *wsConnectItem) renderImage(renderImageData websocket_data.RenderImageReqMsg) {
|
func (w *wsConnectItem) renderImage(renderImageData websocket_data.RenderImageReqMsg) {
|
||||||
//logx.Info("消费渲染数据:", string(data))
|
|
||||||
/*var renderImageData websocket_data.RenderImageReqMsg
|
|
||||||
if err := json.Unmarshal(data, &renderImageData); err != nil {
|
|
||||||
w.renderErrResponse(renderImageData.RenderId, renderImageData.RenderData.TemplateTag, "", "数据格式错误", renderImageData.RenderData.ProductId, w.userId, w.guestId, 0, 0, 0, 0)
|
|
||||||
logx.Error("invalid format of websocket render image message", err)
|
|
||||||
return
|
|
||||||
}*/
|
|
||||||
if renderImageData.RenderData.Logo == "" {
|
if renderImageData.RenderData.Logo == "" {
|
||||||
w.renderErrResponse(renderImageData.RenderId, renderImageData.RenderData.TemplateTag, "", "请传入logo", renderImageData.RenderData.ProductId, w.userId, w.guestId, 0, 0, 0, 0)
|
w.renderErrResponse(renderImageData.RenderId, renderImageData.RenderData.TemplateTag, "", "请传入logo", renderImageData.RenderData.ProductId, w.userId, w.guestId, 0, 0, 0, 0)
|
||||||
return
|
return
|
||||||
|
@ -316,8 +316,9 @@ func (l *defaultImageHandle) LogoCombine(ctx context.Context, in *LogoCombineReq
|
|||||||
|
|
||||||
var resultBLM constants.BLMServiceUrlResult
|
var resultBLM constants.BLMServiceUrlResult
|
||||||
err = curl.NewClient(ctx, &curl.Config{
|
err = curl.NewClient(ctx, &curl.Config{
|
||||||
BaseUrl: *l.BLMServiceUrl,
|
BaseUrl: *l.BLMServiceUrl,
|
||||||
Url: constants.BLMServiceUrlLogoCombine,
|
Url: constants.BLMServiceUrlLogoCombine,
|
||||||
|
RequireTimeout: time.Second * 30,
|
||||||
}).PostJson(postMap, &resultBLM)
|
}).PostJson(postMap, &resultBLM)
|
||||||
|
|
||||||
logc.Infof(ctx, "合图--算法请求--合图--结束时间:%v", time.Now().UTC())
|
logc.Infof(ctx, "合图--算法请求--合图--结束时间:%v", time.Now().UTC())
|
||||||
|
@ -20,8 +20,9 @@ func NewClient(ctx context.Context, c *Config) Client {
|
|||||||
client := resty.New().SetBaseURL(c.BaseUrl)
|
client := resty.New().SetBaseURL(c.BaseUrl)
|
||||||
|
|
||||||
// 设置超时时间为 5 分钟
|
// 设置超时时间为 5 分钟
|
||||||
client.SetTimeout(5 * time.Minute)
|
if c.RequireTimeout == 0 {
|
||||||
|
client.SetTimeout(5 * time.Minute)
|
||||||
|
}
|
||||||
/* 传输链路 */
|
/* 传输链路 */
|
||||||
tracer := otel.GetTracerProvider().Tracer(trace.TraceName)
|
tracer := otel.GetTracerProvider().Tracer(trace.TraceName)
|
||||||
spanCtx, span := tracer.Start(
|
spanCtx, span := tracer.Start(
|
||||||
@ -60,11 +61,12 @@ func NewClient(ctx context.Context, c *Config) Client {
|
|||||||
|
|
||||||
type (
|
type (
|
||||||
Config struct {
|
Config struct {
|
||||||
BaseUrl string `json:"base_url"`
|
BaseUrl string `json:"base_url"`
|
||||||
Url string `json:"url"`
|
Url string `json:"url"`
|
||||||
HeaderData map[string]string `json:"header_data"`
|
HeaderData map[string]string `json:"header_data"`
|
||||||
RetryCount int64 `json:"retry_count"`
|
RetryCount int64 `json:"retry_count"`
|
||||||
RetryWaitTime int64 `json:"retry_wait_time"`
|
RetryWaitTime int64 `json:"retry_wait_time"`
|
||||||
|
RequireTimeout time.Duration `json:"require_timeout"`
|
||||||
}
|
}
|
||||||
defaultClient struct {
|
defaultClient struct {
|
||||||
c *Config
|
c *Config
|
||||||
|
Loading…
x
Reference in New Issue
Block a user