From 5fadf0697d8ff1f353eab3339d35cb6692d5409e Mon Sep 17 00:00:00 2001 From: laodaming <11058467+laudamine@user.noreply.gitee.com> Date: Mon, 24 Jul 2023 19:44:28 +0800 Subject: [PATCH 01/14] fix --- constants/websocket.go | 11 ++++++ .../internal/handler/datatransferhandler.go | 35 ++++++++++++++++--- 2 files changed, 41 insertions(+), 5 deletions(-) create mode 100644 constants/websocket.go diff --git a/constants/websocket.go b/constants/websocket.go new file mode 100644 index 00000000..95bea53d --- /dev/null +++ b/constants/websocket.go @@ -0,0 +1,11 @@ +package constants + +type websocket string + +// websocket消息类型 +const ( + //ws连接成功 + WEBSOCKET_CONNECT_SUCCESS = "connect-success" + //图片渲染 + WEBSOCKET_RENDER_IMAGE = "render-image" +) diff --git a/server/websocket/internal/handler/datatransferhandler.go b/server/websocket/internal/handler/datatransferhandler.go index 4534d041..e1458960 100644 --- a/server/websocket/internal/handler/datatransferhandler.go +++ b/server/websocket/internal/handler/datatransferhandler.go @@ -1,10 +1,17 @@ package handler import ( + "encoding/json" + "fusenapi/constants" "fusenapi/server/websocket/internal/svc" + "fusenapi/server/websocket/internal/types" + "fusenapi/utils/auth" + "github.com/google/uuid" "github.com/gorilla/websocket" + "github.com/zeromicro/go-zero/core/logx" "net/http" "sync" + "time" ) var ( @@ -19,14 +26,15 @@ var ( mapConn = sync.Map{} ) +// 每个连接的连接属性 type wsConnectItem struct { conn *websocket.Conn //websocket的连接 - renImage sync.Map //需要渲染的图片 + property interface{} //属性 } func DataTransferHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { - /*// 解析JWT token,并对空用户进行判断 + // 解析JWT token,并对空用户进行判断 claims, err := svcCtx.ParseJwtToken(r) // 如果解析JWT token出错,则返回未授权的JSON响应并记录错误消息 if err != nil { @@ -34,10 +42,9 @@ func DataTransferHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { w.Write([]byte("connect failed:unauthorized")) return } - var userInfo *auth.UserInfo if claims != nil { // 从token中获取对应的用户信息 - userInfo, err = auth.GetUserInfoFormMapClaims(claims) + _, err = auth.GetUserInfoFormMapClaims(claims) // 如果获取用户信息出错,则返回未授权的JSON响应并记录错误消息 if err != nil { return @@ -53,6 +60,24 @@ func DataTransferHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { logx.Error("http upgrade websocket err:", err) w.Write([]byte("http upgrade websocket err")) return - }*/ + } + //生成连接唯一标识 + uniqueId := uuid.New().String() + time.Now().Format("20060102150405") + c := wsConnectItem{ + conn: conn, + } + //保存连接 + mapConn.Store(uniqueId, c) + //把uniqueId传回去 + rsp := types.DataTransferRsp{ + MsgType: constants.WEBSOCKET_CONNECT_SUCCESS, + Message: uniqueId, + } + b, _ := json.Marshal(rsp) + conn.WriteMessage(websocket.TextMessage, b) } } + +func (w *wsConnectItem) Readloop() { + +} From a4164dbdf6da0c2b52ead4ba3396fb475df338a6 Mon Sep 17 00:00:00 2001 From: laodaming <11058467+laudamine@user.noreply.gitee.com> Date: Tue, 25 Jul 2023 12:06:19 +0800 Subject: [PATCH 02/14] fix --- constants/websocket.go | 4 + server/websocket/etc/websocket.yaml | 2 +- server/websocket/internal/config/config.go | 5 +- .../internal/handler/datatransferhandler.go | 128 ++++++++++++++---- 4 files changed, 114 insertions(+), 25 deletions(-) diff --git a/constants/websocket.go b/constants/websocket.go index 95bea53d..7fca9751 100644 --- a/constants/websocket.go +++ b/constants/websocket.go @@ -4,8 +4,12 @@ type websocket string // websocket消息类型 const ( + //鉴权失败 + WEBSOCKET_UNAUTH = "unAuth" //ws连接成功 WEBSOCKET_CONNECT_SUCCESS = "connect-success" + //心跳信息 + WEBSOCKET_HEARTBEAT = "heartbeat" //图片渲染 WEBSOCKET_RENDER_IMAGE = "render-image" ) diff --git a/server/websocket/etc/websocket.yaml b/server/websocket/etc/websocket.yaml index d22f03a7..7d2b68d7 100644 --- a/server/websocket/etc/websocket.yaml +++ b/server/websocket/etc/websocket.yaml @@ -1,6 +1,6 @@ Name: websocket Host: 0.0.0.0 -Port: 8888 +Port: 9914 SourceMysql: fusentest:XErSYmLELKMnf3Dh@tcp(110.41.19.98:3306)/fusentest Auth: AccessSecret: fusen2023 diff --git a/server/websocket/internal/config/config.go b/server/websocket/internal/config/config.go index b24bb7d7..85ee62f1 100644 --- a/server/websocket/internal/config/config.go +++ b/server/websocket/internal/config/config.go @@ -1,6 +1,9 @@ package config -import "github.com/zeromicro/go-zero/rest" +import ( + "fusenapi/server/websocket/internal/types" + "github.com/zeromicro/go-zero/rest" +) type Config struct { rest.RestConf diff --git a/server/websocket/internal/handler/datatransferhandler.go b/server/websocket/internal/handler/datatransferhandler.go index e1458960..96a65455 100644 --- a/server/websocket/internal/handler/datatransferhandler.go +++ b/server/websocket/internal/handler/datatransferhandler.go @@ -2,6 +2,7 @@ package handler import ( "encoding/json" + "fmt" "fusenapi/constants" "fusenapi/server/websocket/internal/svc" "fusenapi/server/websocket/internal/types" @@ -28,18 +29,32 @@ var ( // 每个连接的连接属性 type wsConnectItem struct { - conn *websocket.Conn //websocket的连接 - property interface{} //属性 + conn *websocket.Conn //websocket的连接 + closeChan chan struct{} //关闭chan + isClose bool //是否已经关闭 + flag string + inChan chan interface{} //接受消息缓冲通道 + property interface{} //属性 } func DataTransferHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { + //升级websocket + conn, err := upgrade.Upgrade(w, r, nil) + if err != nil { + logx.Error("http upgrade websocket err:", err) + return + } + defer conn.Close() + rsp := types.DataTransferRsp{} // 解析JWT token,并对空用户进行判断 claims, err := svcCtx.ParseJwtToken(r) // 如果解析JWT token出错,则返回未授权的JSON响应并记录错误消息 if err != nil { - logx.Info("unauthorized:", err.Error()) // 记录错误日志 - w.Write([]byte("connect failed:unauthorized")) + rsp.MsgType = constants.WEBSOCKET_UNAUTH + rsp.Message = "unAuth" + b, _ := json.Marshal(rsp) + _ = conn.WriteMessage(websocket.TextMessage, b) return } if claims != nil { @@ -47,37 +62,104 @@ func DataTransferHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { _, err = auth.GetUserInfoFormMapClaims(claims) // 如果获取用户信息出错,则返回未授权的JSON响应并记录错误消息 if err != nil { + rsp.MsgType = constants.WEBSOCKET_UNAUTH + rsp.Message = "unAuth!!" + b, _ := json.Marshal(rsp) + _ = conn.WriteMessage(websocket.TextMessage, b) return } } else { // 如果claims为nil,则认为用户身份为白板用户 - w.Write([]byte("connect failed:unauthorized!!")) - return - } - //升级websocket - conn, err := upgrade.Upgrade(w, r, r.Header) - if err != nil { - logx.Error("http upgrade websocket err:", err) - w.Write([]byte("http upgrade websocket err")) + rsp.MsgType = constants.WEBSOCKET_UNAUTH + rsp.Message = "unAuth!!!" + b, _ := json.Marshal(rsp) + _ = conn.WriteMessage(websocket.TextMessage, b) return } //生成连接唯一标识 - uniqueId := uuid.New().String() + time.Now().Format("20060102150405") - c := wsConnectItem{ - conn: conn, + flag := uuid.New().String() + time.Now().Format("20060102150405") + ws := wsConnectItem{ + conn: conn, + flag: flag, + closeChan: make(chan struct{}, 1), + inChan: make(chan interface{}, 1000), + isClose: false, } //保存连接 - mapConn.Store(uniqueId, c) - //把uniqueId传回去 - rsp := types.DataTransferRsp{ - MsgType: constants.WEBSOCKET_CONNECT_SUCCESS, - Message: uniqueId, - } + mapConn.Store(flag, ws) + defer ws.close() + //把连接成功消息发回去 + rsp.MsgType = constants.WEBSOCKET_CONNECT_SUCCESS + rsp.Message = flag b, _ := json.Marshal(rsp) conn.WriteMessage(websocket.TextMessage, b) + //循环读客户端信息 + go ws.readLoop() + //推消息到云渲染 + go ws.sendLoop() + //心跳 + for { + time.Sleep(time.Second * 3) + select { + case <-ws.closeChan: + return + default: + //发送心跳信息 + rsp.MsgType = constants.WEBSOCKET_HEARTBEAT + rsp.Message = "heartbeat" + b, _ = json.Marshal(rsp) + if err = conn.WriteMessage(websocket.TextMessage, b); err != nil { + logx.Error("发送心跳信息异常,关闭连接:", flag, err) + ws.close() + return + } + } + } + } } -func (w *wsConnectItem) Readloop() { - +// 关闭连接 +func (w *wsConnectItem) close() { + logx.Info("websocket:", w.flag, " is closing...") + w.conn.Close() + mapConn.Delete(w.flag) + if !w.isClose { + w.isClose = true + close(w.closeChan) + close(w.inChan) + } + logx.Info("websocket:", w.flag, " is closed") +} + +// 接受客户端发来的消息 +func (w *wsConnectItem) readLoop() { + for { + select { + case <-w.closeChan: //如果关闭了 + return + default: + _, data, err := w.conn.ReadMessage() + if err != nil { + logx.Error("接受信息错误:", err) + //关闭连接 + w.close() + return + } + //消息传入缓冲通道 + w.inChan <- data + } + } +} + +// 把收到的消息发往不同的地方处理 +func (w *wsConnectItem) sendLoop() { + for { + select { + case <-w.closeChan: + return + case data := <-w.inChan: + fmt.Println(data) + } + } } From 9c72a8a4c9f3e99f0b7cd054eead36f594b4afbc Mon Sep 17 00:00:00 2001 From: laodaming <11058467+laudamine@user.noreply.gitee.com> Date: Tue, 25 Jul 2023 17:10:50 +0800 Subject: [PATCH 03/14] fix --- .../internal/handler/datatransferhandler.go | 129 +++++++++++++----- .../internal/handler/rendernotifyhandler.go | 62 +++++++++ server/websocket/internal/handler/routes.go | 5 + .../internal/logic/rendernotifylogic.go | 43 ++++++ server/websocket/internal/types/types.go | 32 ++++- server_api/websocket.api | 32 ++++- 6 files changed, 262 insertions(+), 41 deletions(-) create mode 100644 server/websocket/internal/handler/rendernotifyhandler.go create mode 100644 server/websocket/internal/logic/rendernotifylogic.go diff --git a/server/websocket/internal/handler/datatransferhandler.go b/server/websocket/internal/handler/datatransferhandler.go index 96a65455..e07d3abe 100644 --- a/server/websocket/internal/handler/datatransferhandler.go +++ b/server/websocket/internal/handler/datatransferhandler.go @@ -4,9 +4,9 @@ import ( "encoding/json" "fmt" "fusenapi/constants" + "fusenapi/server/websocket/internal/logic" "fusenapi/server/websocket/internal/svc" "fusenapi/server/websocket/internal/types" - "fusenapi/utils/auth" "github.com/google/uuid" "github.com/gorilla/websocket" "github.com/zeromicro/go-zero/core/logx" @@ -24,21 +24,27 @@ var ( }, } //连接map池 - mapConn = sync.Map{} + mapConnPool = sync.Map{} ) // 每个连接的连接属性 type wsConnectItem struct { - conn *websocket.Conn //websocket的连接 - closeChan chan struct{} //关闭chan - isClose bool //是否已经关闭 - flag string - inChan chan interface{} //接受消息缓冲通道 - property interface{} //属性 + conn *websocket.Conn //websocket的连接 + closeChan chan struct{} //关闭chan + isClose bool //是否已经关闭 + flag string + inChan chan []byte //接受消息缓冲通道 + outChan chan []byte //发送回客户端的消息 + mutex sync.Mutex + renderImage map[string]struct{} //需要渲染的图片 } func DataTransferHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { + // 创建一个业务逻辑层实例 + var req types.DataTransferReq + l := logic.NewDataTransferLogic(r.Context(), svcCtx) + l.DataTransfer(&req, nil) //升级websocket conn, err := upgrade.Upgrade(w, r, nil) if err != nil { @@ -48,7 +54,7 @@ func DataTransferHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { defer conn.Close() rsp := types.DataTransferRsp{} // 解析JWT token,并对空用户进行判断 - claims, err := svcCtx.ParseJwtToken(r) + /*claims, err := svcCtx.ParseJwtToken(r) // 如果解析JWT token出错,则返回未授权的JSON响应并记录错误消息 if err != nil { rsp.MsgType = constants.WEBSOCKET_UNAUTH @@ -75,18 +81,20 @@ func DataTransferHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { b, _ := json.Marshal(rsp) _ = conn.WriteMessage(websocket.TextMessage, b) return - } + }*/ //生成连接唯一标识 flag := uuid.New().String() + time.Now().Format("20060102150405") ws := wsConnectItem{ - conn: conn, - flag: flag, - closeChan: make(chan struct{}, 1), - inChan: make(chan interface{}, 1000), - isClose: false, + conn: conn, + flag: flag, + closeChan: make(chan struct{}, 1), + inChan: make(chan []byte, 100), + outChan: make(chan []byte, 100), + renderImage: make(map[string]struct{}), + isClose: false, } //保存连接 - mapConn.Store(flag, ws) + mapConnPool.Store(flag, ws) defer ws.close() //把连接成功消息发回去 rsp.MsgType = constants.WEBSOCKET_CONNECT_SUCCESS @@ -95,43 +103,66 @@ func DataTransferHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { conn.WriteMessage(websocket.TextMessage, b) //循环读客户端信息 go ws.readLoop() + //循环把数据发送给客户端 + go ws.writeLoop() //推消息到云渲染 go ws.sendLoop() //心跳 - for { - time.Sleep(time.Second * 3) - select { - case <-ws.closeChan: + ws.heartbeat() + + } +} + +// 心跳 +func (w *wsConnectItem) heartbeat() { + rsp := types.DataTransferRsp{ + MsgType: constants.WEBSOCKET_HEARTBEAT, + Message: "", + } + for { + time.Sleep(time.Second * 10) + select { + case <-w.closeChan: + return + default: + //发送心跳信息 + b, _ := json.Marshal(rsp) + if err := w.conn.WriteMessage(websocket.TextMessage, b); err != nil { + logx.Error("发送心跳信息异常,关闭连接:", w.flag, err) + w.close() return - default: - //发送心跳信息 - rsp.MsgType = constants.WEBSOCKET_HEARTBEAT - rsp.Message = "heartbeat" - b, _ = json.Marshal(rsp) - if err = conn.WriteMessage(websocket.TextMessage, b); err != nil { - logx.Error("发送心跳信息异常,关闭连接:", flag, err) - ws.close() - return - } } } - } } // 关闭连接 func (w *wsConnectItem) close() { + w.mutex.Lock() + defer w.mutex.Unlock() logx.Info("websocket:", w.flag, " is closing...") w.conn.Close() - mapConn.Delete(w.flag) + mapConnPool.Delete(w.flag) if !w.isClose { w.isClose = true close(w.closeChan) + close(w.outChan) close(w.inChan) } logx.Info("websocket:", w.flag, " is closed") } +func (w *wsConnectItem) writeLoop() { + for { + select { + case <-w.closeChan: //如果关闭了 + return + case data := <-w.outChan: + w.conn.WriteMessage(websocket.TextMessage, data) + } + } +} + // 接受客户端发来的消息 func (w *wsConnectItem) readLoop() { for { @@ -159,7 +190,39 @@ func (w *wsConnectItem) sendLoop() { case <-w.closeChan: return case data := <-w.inChan: - fmt.Println(data) + w.dealwithReciveData(data) } } } + +// 处理接受到的数据 +func (w *wsConnectItem) dealwithReciveData(data []byte) { + var parseInfo types.DataTransferReq + if err := json.Unmarshal(data, &parseInfo); err != nil { + logx.Error("invalid format of websocket message") + return + } + switch parseInfo.MsgType { + case constants.WEBSOCKET_RENDER_IMAGE: //图片渲染 + var renderImageData []types.RenderImageReqMsg + if err := json.Unmarshal([]byte(parseInfo.Message), &renderImageData); err != nil { + logx.Error("invalid format of websocket render image message", err) + return + } + logx.Info("收到请求云渲染图片数据:", renderImageData) + w.mutex.Lock() + defer w.mutex.Unlock() + //把需要渲染的图片加进去 + for _, v := range renderImageData { + key := getRenderImageMapKey(v.ProductId, v.SizeId, v.TemplateId) + w.renderImage[key] = struct{}{} + } + default: + + } +} + +// 获取需要渲染图片的map key +func getRenderImageMapKey(productId, sizeId, templateId int64) string { + return fmt.Sprintf("%d-%d-%d", productId, sizeId, templateId) +} diff --git a/server/websocket/internal/handler/rendernotifyhandler.go b/server/websocket/internal/handler/rendernotifyhandler.go new file mode 100644 index 00000000..9c2eb28d --- /dev/null +++ b/server/websocket/internal/handler/rendernotifyhandler.go @@ -0,0 +1,62 @@ +package handler + +import ( + "encoding/json" + "fusenapi/utils/basic" + "github.com/zeromicro/go-zero/core/logx" + "github.com/zeromicro/go-zero/rest/httpx" + "net/http" + + "fusenapi/server/websocket/internal/svc" + "fusenapi/server/websocket/internal/types" +) + +func RenderNotifyHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + var req types.RenderNotifyReq + _, err := basic.RequestParse(w, r, svcCtx, &req) + if err != nil { + return + } + mapConnPool.Range(func(key, value any) bool { + ws, ok := value.(wsConnectItem) + if !ok { + return false + } + setOutRenderImage(req, ws) + return true + }) + httpx.OkJsonCtx(r.Context(), w, basic.Response{ + Code: 200, + Message: "success", + Data: nil, + }) + } +} + +// 把渲染好的数据放入outchan +func setOutRenderImage(req types.RenderNotifyReq, ws wsConnectItem) { + ws.mutex.Lock() + defer ws.mutex.Unlock() + for _, notifyItem := range req.NotifyList { + renderKey := getRenderImageMapKey(notifyItem.ProductId, notifyItem.SizeId, notifyItem.TemplateId) + //加载并删除 + _, ok := ws.renderImage[renderKey] + if !ok { + continue + } + responseData := types.RenderImageRspMsg{ + ProductId: notifyItem.ProductId, + SizeId: notifyItem.SizeId, + TemplateId: notifyItem.TemplateId, + Source: "我是渲染资源", + } + b, _ := json.Marshal(responseData) + select { + case <-ws.closeChan: + return + case ws.outChan <- b: + logx.Info("notify send render result to out chan") + } + } +} diff --git a/server/websocket/internal/handler/routes.go b/server/websocket/internal/handler/routes.go index fd3200e4..859ecc5b 100644 --- a/server/websocket/internal/handler/routes.go +++ b/server/websocket/internal/handler/routes.go @@ -17,6 +17,11 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) { Path: "/api/websocket/data_transfer", Handler: DataTransferHandler(serverCtx), }, + { + Method: http.MethodPost, + Path: "/api/websocket/render_notify", + Handler: RenderNotifyHandler(serverCtx), + }, }, ) } diff --git a/server/websocket/internal/logic/rendernotifylogic.go b/server/websocket/internal/logic/rendernotifylogic.go new file mode 100644 index 00000000..49803d29 --- /dev/null +++ b/server/websocket/internal/logic/rendernotifylogic.go @@ -0,0 +1,43 @@ +package logic + +import ( + "fusenapi/utils/auth" + "fusenapi/utils/basic" + + "context" + + "fusenapi/server/websocket/internal/svc" + "fusenapi/server/websocket/internal/types" + + "github.com/zeromicro/go-zero/core/logx" +) + +type RenderNotifyLogic struct { + logx.Logger + ctx context.Context + svcCtx *svc.ServiceContext +} + +func NewRenderNotifyLogic(ctx context.Context, svcCtx *svc.ServiceContext) *RenderNotifyLogic { + return &RenderNotifyLogic{ + Logger: logx.WithContext(ctx), + ctx: ctx, + svcCtx: svcCtx, + } +} + +// 处理进入前逻辑w,r +// func (l *RenderNotifyLogic) BeforeLogic(w http.ResponseWriter, r *http.Request) { +// } + +// 处理逻辑后 w,r 如:重定向, resp 必须重新处理 +// func (l *RenderNotifyLogic) AfterLogic(w http.ResponseWriter, r *http.Request, resp *basic.Response) { +// // httpx.OkJsonCtx(r.Context(), w, resp) +// } + +func (l *RenderNotifyLogic) RenderNotify(req *types.RenderNotifyReq, userinfo *auth.UserInfo) (resp *basic.Response) { + // 返回值必须调用Set重新返回, resp可以空指针调用 resp.SetStatus(basic.CodeOK, data) + // userinfo 传入值时, 一定不为null + + return resp.SetStatus(basic.CodeOK) +} diff --git a/server/websocket/internal/types/types.go b/server/websocket/internal/types/types.go index 06201d39..35df171d 100644 --- a/server/websocket/internal/types/types.go +++ b/server/websocket/internal/types/types.go @@ -6,13 +6,37 @@ import ( ) type DataTransferReq struct { - MsgType string `json:"msg_type"` //消息类型 - Message interface{} `json:"message"` //传递的消息 + MsgType string `json:"msg_type"` //消息类型 + Message string `json:"message"` //传递的消息 } type DataTransferRsp struct { - MsgType string `json:"msg_type"` //消息类型 - Message interface{} `json:"message"` //传递的消息 + MsgType string `json:"msg_type"` //消息类型 + Message string `json:"message"` //传递的消息 +} + +type RenderImageReqMsg struct { + ProductId int64 `json:"product_id"` + SizeId int64 `json:"size_id"` + TemplateId int64 `json:"template_id"` +} + +type RenderImageRspMsg struct { + ProductId int64 `json:"product_id"` + SizeId int64 `json:"size_id"` + TemplateId int64 `json:"template_id"` + Source string `json:"source"` +} + +type RenderNotifyReq struct { + NotifyList []NotifyItem `json:"notify_list"` +} + +type NotifyItem struct { + ProductId int64 `json:"product_id"` + SizeId int64 `json:"size_id"` + TemplateId int64 `json:"template_id"` + Source string `json:"source"` } type Request struct { diff --git a/server_api/websocket.api b/server_api/websocket.api index 786e67e7..ee90cdcc 100644 --- a/server_api/websocket.api +++ b/server_api/websocket.api @@ -12,14 +12,38 @@ service websocket { //websocket数据交互 @handler DataTransferHandler get /api/websocket/data_transfer(DataTransferReq) returns (response); + //渲染完了通知接口 + @handler RenderNotifyHandler + post /api/websocket/render_notify(RenderNotifyReq) returns (response); } //websocket数据交互 type DataTransferReq { - MsgType string `json:"msg_type"` //消息类型 - Message interface{} `json:"message"` //传递的消息 + MsgType string `json:"msg_type"` //消息类型 + Message string `json:"message"` //传递的消息 } type DataTransferRsp { - MsgType string `json:"msg_type"` //消息类型 - Message interface{} `json:"message"` //传递的消息 + MsgType string `json:"msg_type"` //消息类型 + Message string `json:"message"` //传递的消息 +} +type RenderImageReqMsg { //websocket接受需要云渲染的图片 + ProductId int64 `json:"product_id"` + SizeId int64 `json:"size_id"` + TemplateId int64 `json:"template_id"` +} +type RenderImageRspMsg { //websocket发送渲染完的数据 + ProductId int64 `json:"product_id"` + SizeId int64 `json:"size_id"` + TemplateId int64 `json:"template_id"` + Source string `json:"source"` +} +//渲染完了通知接口 +type RenderNotifyReq { + NotifyList []NotifyItem `json:"notify_list"` +} +type NotifyItem { + ProductId int64 `json:"product_id"` + SizeId int64 `json:"size_id"` + TemplateId int64 `json:"template_id"` + Source string `json:"source"` } \ No newline at end of file From 17aac7ad664864f15a639d7700c1412831837fd3 Mon Sep 17 00:00:00 2001 From: laodaming <11058467+laudamine@user.noreply.gitee.com> Date: Tue, 25 Jul 2023 17:12:37 +0800 Subject: [PATCH 04/14] fix --- server/websocket/internal/handler/datatransferhandler.go | 4 ++-- server/websocket/internal/handler/rendernotifyhandler.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/server/websocket/internal/handler/datatransferhandler.go b/server/websocket/internal/handler/datatransferhandler.go index e07d3abe..9e0363eb 100644 --- a/server/websocket/internal/handler/datatransferhandler.go +++ b/server/websocket/internal/handler/datatransferhandler.go @@ -214,7 +214,7 @@ func (w *wsConnectItem) dealwithReciveData(data []byte) { defer w.mutex.Unlock() //把需要渲染的图片加进去 for _, v := range renderImageData { - key := getRenderImageMapKey(v.ProductId, v.SizeId, v.TemplateId) + key := w.getRenderImageMapKey(v.ProductId, v.SizeId, v.TemplateId) w.renderImage[key] = struct{}{} } default: @@ -223,6 +223,6 @@ func (w *wsConnectItem) dealwithReciveData(data []byte) { } // 获取需要渲染图片的map key -func getRenderImageMapKey(productId, sizeId, templateId int64) string { +func (w *wsConnectItem) getRenderImageMapKey(productId, sizeId, templateId int64) string { return fmt.Sprintf("%d-%d-%d", productId, sizeId, templateId) } diff --git a/server/websocket/internal/handler/rendernotifyhandler.go b/server/websocket/internal/handler/rendernotifyhandler.go index 9c2eb28d..6dde0c3f 100644 --- a/server/websocket/internal/handler/rendernotifyhandler.go +++ b/server/websocket/internal/handler/rendernotifyhandler.go @@ -39,7 +39,7 @@ func setOutRenderImage(req types.RenderNotifyReq, ws wsConnectItem) { ws.mutex.Lock() defer ws.mutex.Unlock() for _, notifyItem := range req.NotifyList { - renderKey := getRenderImageMapKey(notifyItem.ProductId, notifyItem.SizeId, notifyItem.TemplateId) + renderKey := ws.getRenderImageMapKey(notifyItem.ProductId, notifyItem.SizeId, notifyItem.TemplateId) //加载并删除 _, ok := ws.renderImage[renderKey] if !ok { From d0fc91b61b25db36a881162e6edb2b450bd59064 Mon Sep 17 00:00:00 2001 From: laodaming <11058467+laudamine@user.noreply.gitee.com> Date: Tue, 25 Jul 2023 17:41:38 +0800 Subject: [PATCH 05/14] fix --- constants/websocket.go | 3 ++ .../internal/handler/datatransferhandler.go | 3 +- .../internal/handler/rendernotifyhandler.go | 46 ++++++++++++++++++- server/websocket/internal/types/types.go | 2 + server_api/websocket.api | 2 + 5 files changed, 54 insertions(+), 2 deletions(-) diff --git a/constants/websocket.go b/constants/websocket.go index 7fca9751..a9a6aa5d 100644 --- a/constants/websocket.go +++ b/constants/websocket.go @@ -13,3 +13,6 @@ const ( //图片渲染 WEBSOCKET_RENDER_IMAGE = "render-image" ) + +// 云渲染通知需要的签名字符串 +const RENDER_NOTIFY_SIGN_KEY = "fusen-render-notify-%s-%d" diff --git a/server/websocket/internal/handler/datatransferhandler.go b/server/websocket/internal/handler/datatransferhandler.go index 9e0363eb..afcaf88d 100644 --- a/server/websocket/internal/handler/datatransferhandler.go +++ b/server/websocket/internal/handler/datatransferhandler.go @@ -203,7 +203,8 @@ func (w *wsConnectItem) dealwithReciveData(data []byte) { return } switch parseInfo.MsgType { - case constants.WEBSOCKET_RENDER_IMAGE: //图片渲染 + //图片渲染 + case constants.WEBSOCKET_RENDER_IMAGE: var renderImageData []types.RenderImageReqMsg if err := json.Unmarshal([]byte(parseInfo.Message), &renderImageData); err != nil { logx.Error("invalid format of websocket render image message", err) diff --git a/server/websocket/internal/handler/rendernotifyhandler.go b/server/websocket/internal/handler/rendernotifyhandler.go index 6dde0c3f..5fa2371a 100644 --- a/server/websocket/internal/handler/rendernotifyhandler.go +++ b/server/websocket/internal/handler/rendernotifyhandler.go @@ -1,11 +1,16 @@ package handler import ( + "crypto/sha256" + "encoding/hex" "encoding/json" + "fmt" + "fusenapi/constants" "fusenapi/utils/basic" "github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/rest/httpx" "net/http" + "time" "fusenapi/server/websocket/internal/svc" "fusenapi/server/websocket/internal/types" @@ -16,8 +21,45 @@ func RenderNotifyHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { var req types.RenderNotifyReq _, err := basic.RequestParse(w, r, svcCtx, &req) if err != nil { + httpx.OkJsonCtx(r.Context(), w, basic.Response{ + Code: basic.CodeRequestParamsErr.Code, + Message: "err param", + Data: nil, + }) return } + if len(req.NotifyList) == 0 { + httpx.OkJsonCtx(r.Context(), w, basic.Response{ + Code: basic.CodeRequestParamsErr.Code, + Message: "invalid param,notify list is empty", + Data: nil, + }) + return + } + if time.Now().Unix()-120 > req.Time { + httpx.OkJsonCtx(r.Context(), w, basic.Response{ + Code: basic.CodeRequestParamsErr.Code, + Message: "invalid param,time is expired", + Data: nil, + }) + return + } + //验证签名 sha256 + notifyByte, _ := json.Marshal(req.NotifyList) + h := sha256.New() + h.Write([]byte(fmt.Sprintf(constants.RENDER_NOTIFY_SIGN_KEY, string(notifyByte), req.Time))) + signHex := h.Sum(nil) + sign := hex.EncodeToString(signHex) + fmt.Println(sign) + if req.Sign != sign { + httpx.OkJsonCtx(r.Context(), w, basic.Response{ + Code: basic.CodeRequestParamsErr.Code, + Message: "invalid sign", + Data: nil, + }) + return + } + //遍历链接 mapConnPool.Range(func(key, value any) bool { ws, ok := value.(wsConnectItem) if !ok { @@ -40,7 +82,7 @@ func setOutRenderImage(req types.RenderNotifyReq, ws wsConnectItem) { defer ws.mutex.Unlock() for _, notifyItem := range req.NotifyList { renderKey := ws.getRenderImageMapKey(notifyItem.ProductId, notifyItem.SizeId, notifyItem.TemplateId) - //加载并删除 + //查询 _, ok := ws.renderImage[renderKey] if !ok { continue @@ -58,5 +100,7 @@ func setOutRenderImage(req types.RenderNotifyReq, ws wsConnectItem) { case ws.outChan <- b: logx.Info("notify send render result to out chan") } + //删掉已经处理的渲染任务 + delete(ws.renderImage, renderKey) } } diff --git a/server/websocket/internal/types/types.go b/server/websocket/internal/types/types.go index 35df171d..6082881a 100644 --- a/server/websocket/internal/types/types.go +++ b/server/websocket/internal/types/types.go @@ -29,6 +29,8 @@ type RenderImageRspMsg struct { } type RenderNotifyReq struct { + Sign string `json:"sign"` + Time int64 `json:"time"` NotifyList []NotifyItem `json:"notify_list"` } diff --git a/server_api/websocket.api b/server_api/websocket.api index ee90cdcc..4b5c1fed 100644 --- a/server_api/websocket.api +++ b/server_api/websocket.api @@ -39,6 +39,8 @@ type RenderImageRspMsg { //websocket发送渲染完的数据 } //渲染完了通知接口 type RenderNotifyReq { + Sign string `json:"sign"` + Time int64 `json:"time"` NotifyList []NotifyItem `json:"notify_list"` } type NotifyItem { From b0f598af6816880ac72124e6ad08d0448d80aba5 Mon Sep 17 00:00:00 2001 From: laodaming <11058467+laudamine@user.noreply.gitee.com> Date: Tue, 25 Jul 2023 18:04:01 +0800 Subject: [PATCH 06/14] fix --- .../internal/handler/datatransferhandler.go | 12 ++++++------ .../internal/handler/rendernotifyhandler.go | 1 - server/websocket/internal/types/types.go | 8 ++++---- server_api/websocket.api | 8 ++++---- 4 files changed, 14 insertions(+), 15 deletions(-) diff --git a/server/websocket/internal/handler/datatransferhandler.go b/server/websocket/internal/handler/datatransferhandler.go index afcaf88d..827ac058 100644 --- a/server/websocket/internal/handler/datatransferhandler.go +++ b/server/websocket/internal/handler/datatransferhandler.go @@ -97,8 +97,8 @@ func DataTransferHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { mapConnPool.Store(flag, ws) defer ws.close() //把连接成功消息发回去 - rsp.MsgType = constants.WEBSOCKET_CONNECT_SUCCESS - rsp.Message = flag + rsp.T = constants.WEBSOCKET_CONNECT_SUCCESS + rsp.D = flag b, _ := json.Marshal(rsp) conn.WriteMessage(websocket.TextMessage, b) //循环读客户端信息 @@ -116,8 +116,8 @@ func DataTransferHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { // 心跳 func (w *wsConnectItem) heartbeat() { rsp := types.DataTransferRsp{ - MsgType: constants.WEBSOCKET_HEARTBEAT, - Message: "", + T: constants.WEBSOCKET_HEARTBEAT, + D: "", } for { time.Sleep(time.Second * 10) @@ -202,11 +202,11 @@ func (w *wsConnectItem) dealwithReciveData(data []byte) { logx.Error("invalid format of websocket message") return } - switch parseInfo.MsgType { + switch parseInfo.T { //图片渲染 case constants.WEBSOCKET_RENDER_IMAGE: var renderImageData []types.RenderImageReqMsg - if err := json.Unmarshal([]byte(parseInfo.Message), &renderImageData); err != nil { + if err := json.Unmarshal([]byte(parseInfo.D), &renderImageData); err != nil { logx.Error("invalid format of websocket render image message", err) return } diff --git a/server/websocket/internal/handler/rendernotifyhandler.go b/server/websocket/internal/handler/rendernotifyhandler.go index 5fa2371a..de6e3e21 100644 --- a/server/websocket/internal/handler/rendernotifyhandler.go +++ b/server/websocket/internal/handler/rendernotifyhandler.go @@ -50,7 +50,6 @@ func RenderNotifyHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { h.Write([]byte(fmt.Sprintf(constants.RENDER_NOTIFY_SIGN_KEY, string(notifyByte), req.Time))) signHex := h.Sum(nil) sign := hex.EncodeToString(signHex) - fmt.Println(sign) if req.Sign != sign { httpx.OkJsonCtx(r.Context(), w, basic.Response{ Code: basic.CodeRequestParamsErr.Code, diff --git a/server/websocket/internal/types/types.go b/server/websocket/internal/types/types.go index 6082881a..6a1cf750 100644 --- a/server/websocket/internal/types/types.go +++ b/server/websocket/internal/types/types.go @@ -6,13 +6,13 @@ import ( ) type DataTransferReq struct { - MsgType string `json:"msg_type"` //消息类型 - Message string `json:"message"` //传递的消息 + T string `json:"t"` //消息类型 + D string `json:"d"` //传递的消息 } type DataTransferRsp struct { - MsgType string `json:"msg_type"` //消息类型 - Message string `json:"message"` //传递的消息 + T string `json:"t"` //消息类型 + D string `json:"d"` //传递的消息 } type RenderImageReqMsg struct { diff --git a/server_api/websocket.api b/server_api/websocket.api index 4b5c1fed..d0daf98b 100644 --- a/server_api/websocket.api +++ b/server_api/websocket.api @@ -19,12 +19,12 @@ service websocket { //websocket数据交互 type DataTransferReq { - MsgType string `json:"msg_type"` //消息类型 - Message string `json:"message"` //传递的消息 + T string `json:"t"` //消息类型 + D string `json:"d"` //传递的消息 } type DataTransferRsp { - MsgType string `json:"msg_type"` //消息类型 - Message string `json:"message"` //传递的消息 + T string `json:"t"` //消息类型 + D string `json:"d"` //传递的消息 } type RenderImageReqMsg { //websocket接受需要云渲染的图片 ProductId int64 `json:"product_id"` From 932ee1a578484ab46078caf4ef8af7c7558759d7 Mon Sep 17 00:00:00 2001 From: laodaming <11058467+laudamine@user.noreply.gitee.com> Date: Tue, 25 Jul 2023 18:21:30 +0800 Subject: [PATCH 07/14] fix --- server/websocket/internal/handler/rendernotifyhandler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/websocket/internal/handler/rendernotifyhandler.go b/server/websocket/internal/handler/rendernotifyhandler.go index de6e3e21..f7dfa3ec 100644 --- a/server/websocket/internal/handler/rendernotifyhandler.go +++ b/server/websocket/internal/handler/rendernotifyhandler.go @@ -36,7 +36,7 @@ func RenderNotifyHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { }) return } - if time.Now().Unix()-120 > req.Time { + if time.Now().Unix()-120 > req.Time || req.Time > time.Now().Unix() { httpx.OkJsonCtx(r.Context(), w, basic.Response{ Code: basic.CodeRequestParamsErr.Code, Message: "invalid param,time is expired", From 222b9ec155537e9542f76c3137bc5c74820ff094 Mon Sep 17 00:00:00 2001 From: laodaming <11058467+laudamine@user.noreply.gitee.com> Date: Tue, 25 Jul 2023 18:34:56 +0800 Subject: [PATCH 08/14] fix --- .../internal/handler/datatransferhandler.go | 34 ++++++++++--------- .../internal/handler/rendernotifyhandler.go | 2 +- 2 files changed, 19 insertions(+), 17 deletions(-) diff --git a/server/websocket/internal/handler/datatransferhandler.go b/server/websocket/internal/handler/datatransferhandler.go index 827ac058..d981d377 100644 --- a/server/websocket/internal/handler/datatransferhandler.go +++ b/server/websocket/internal/handler/datatransferhandler.go @@ -125,13 +125,14 @@ func (w *wsConnectItem) heartbeat() { case <-w.closeChan: return default: - //发送心跳信息 - b, _ := json.Marshal(rsp) - if err := w.conn.WriteMessage(websocket.TextMessage, b); err != nil { - logx.Error("发送心跳信息异常,关闭连接:", w.flag, err) - w.close() - return - } + + } + //发送心跳信息 + b, _ := json.Marshal(rsp) + if err := w.conn.WriteMessage(websocket.TextMessage, b); err != nil { + logx.Error("发送心跳信息异常,关闭连接:", w.flag, err) + w.close() + return } } } @@ -170,16 +171,17 @@ func (w *wsConnectItem) readLoop() { case <-w.closeChan: //如果关闭了 return default: - _, data, err := w.conn.ReadMessage() - if err != nil { - logx.Error("接受信息错误:", err) - //关闭连接 - w.close() - return - } - //消息传入缓冲通道 - w.inChan <- data + } + _, data, err := w.conn.ReadMessage() + if err != nil { + logx.Error("接受信息错误:", err) + //关闭连接 + w.close() + return + } + //消息传入缓冲通道 + w.inChan <- data } } diff --git a/server/websocket/internal/handler/rendernotifyhandler.go b/server/websocket/internal/handler/rendernotifyhandler.go index f7dfa3ec..3767bd9c 100644 --- a/server/websocket/internal/handler/rendernotifyhandler.go +++ b/server/websocket/internal/handler/rendernotifyhandler.go @@ -36,7 +36,7 @@ func RenderNotifyHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { }) return } - if time.Now().Unix()-120 > req.Time || req.Time > time.Now().Unix() { + if time.Now().Unix()-120 > req.Time /*|| req.Time > time.Now().Unix() */ { httpx.OkJsonCtx(r.Context(), w, basic.Response{ Code: basic.CodeRequestParamsErr.Code, Message: "invalid param,time is expired", From 743ea961574d446a3f6376642893b3de8d67bd85 Mon Sep 17 00:00:00 2001 From: laodaming <11058467+laudamine@user.noreply.gitee.com> Date: Tue, 25 Jul 2023 19:33:59 +0800 Subject: [PATCH 09/14] fix --- .../internal/handler/datatransferhandler.go | 29 +++++++++++++++++ .../internal/handler/rendernotifyhandler.go | 32 +------------------ 2 files changed, 30 insertions(+), 31 deletions(-) diff --git a/server/websocket/internal/handler/datatransferhandler.go b/server/websocket/internal/handler/datatransferhandler.go index d981d377..04f0a7ae 100644 --- a/server/websocket/internal/handler/datatransferhandler.go +++ b/server/websocket/internal/handler/datatransferhandler.go @@ -225,6 +225,35 @@ func (w *wsConnectItem) dealwithReciveData(data []byte) { } } +// 把渲染好的数据放入outchan +func (w *wsConnectItem) setOutRenderImage(req types.RenderNotifyReq, ws wsConnectItem) { + ws.mutex.Lock() + defer ws.mutex.Unlock() + for _, notifyItem := range req.NotifyList { + renderKey := ws.getRenderImageMapKey(notifyItem.ProductId, notifyItem.SizeId, notifyItem.TemplateId) + //查询 + _, ok := ws.renderImage[renderKey] + if !ok { + continue + } + responseData := types.RenderImageRspMsg{ + ProductId: notifyItem.ProductId, + SizeId: notifyItem.SizeId, + TemplateId: notifyItem.TemplateId, + Source: "我是渲染资源", + } + b, _ := json.Marshal(responseData) + select { + case <-ws.closeChan: + return + case ws.outChan <- b: + logx.Info("notify send render result to out chan") + } + //删掉已经处理的渲染任务 + delete(ws.renderImage, renderKey) + } +} + // 获取需要渲染图片的map key func (w *wsConnectItem) getRenderImageMapKey(productId, sizeId, templateId int64) string { return fmt.Sprintf("%d-%d-%d", productId, sizeId, templateId) diff --git a/server/websocket/internal/handler/rendernotifyhandler.go b/server/websocket/internal/handler/rendernotifyhandler.go index 3767bd9c..b51fee32 100644 --- a/server/websocket/internal/handler/rendernotifyhandler.go +++ b/server/websocket/internal/handler/rendernotifyhandler.go @@ -7,7 +7,6 @@ import ( "fmt" "fusenapi/constants" "fusenapi/utils/basic" - "github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/rest/httpx" "net/http" "time" @@ -64,7 +63,7 @@ func RenderNotifyHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { if !ok { return false } - setOutRenderImage(req, ws) + ws.setOutRenderImage(req, ws) return true }) httpx.OkJsonCtx(r.Context(), w, basic.Response{ @@ -74,32 +73,3 @@ func RenderNotifyHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { }) } } - -// 把渲染好的数据放入outchan -func setOutRenderImage(req types.RenderNotifyReq, ws wsConnectItem) { - ws.mutex.Lock() - defer ws.mutex.Unlock() - for _, notifyItem := range req.NotifyList { - renderKey := ws.getRenderImageMapKey(notifyItem.ProductId, notifyItem.SizeId, notifyItem.TemplateId) - //查询 - _, ok := ws.renderImage[renderKey] - if !ok { - continue - } - responseData := types.RenderImageRspMsg{ - ProductId: notifyItem.ProductId, - SizeId: notifyItem.SizeId, - TemplateId: notifyItem.TemplateId, - Source: "我是渲染资源", - } - b, _ := json.Marshal(responseData) - select { - case <-ws.closeChan: - return - case ws.outChan <- b: - logx.Info("notify send render result to out chan") - } - //删掉已经处理的渲染任务 - delete(ws.renderImage, renderKey) - } -} From 3be83ed42401d2bda4ea75dcedaa163430b9e13f Mon Sep 17 00:00:00 2001 From: laodaming <11058467+laudamine@user.noreply.gitee.com> Date: Tue, 25 Jul 2023 19:35:52 +0800 Subject: [PATCH 10/14] fix --- .../internal/handler/datatransferhandler.go | 16 ++++++++-------- .../internal/handler/rendernotifyhandler.go | 2 +- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/server/websocket/internal/handler/datatransferhandler.go b/server/websocket/internal/handler/datatransferhandler.go index 04f0a7ae..7c8a6c5b 100644 --- a/server/websocket/internal/handler/datatransferhandler.go +++ b/server/websocket/internal/handler/datatransferhandler.go @@ -226,13 +226,13 @@ func (w *wsConnectItem) dealwithReciveData(data []byte) { } // 把渲染好的数据放入outchan -func (w *wsConnectItem) setOutRenderImage(req types.RenderNotifyReq, ws wsConnectItem) { - ws.mutex.Lock() - defer ws.mutex.Unlock() +func (w *wsConnectItem) setOutRenderImage(req types.RenderNotifyReq) { + w.mutex.Lock() + defer w.mutex.Unlock() for _, notifyItem := range req.NotifyList { - renderKey := ws.getRenderImageMapKey(notifyItem.ProductId, notifyItem.SizeId, notifyItem.TemplateId) + renderKey := w.getRenderImageMapKey(notifyItem.ProductId, notifyItem.SizeId, notifyItem.TemplateId) //查询 - _, ok := ws.renderImage[renderKey] + _, ok := w.renderImage[renderKey] if !ok { continue } @@ -244,13 +244,13 @@ func (w *wsConnectItem) setOutRenderImage(req types.RenderNotifyReq, ws wsConnec } b, _ := json.Marshal(responseData) select { - case <-ws.closeChan: + case <-w.closeChan: return - case ws.outChan <- b: + case w.outChan <- b: logx.Info("notify send render result to out chan") } //删掉已经处理的渲染任务 - delete(ws.renderImage, renderKey) + delete(w.renderImage, renderKey) } } diff --git a/server/websocket/internal/handler/rendernotifyhandler.go b/server/websocket/internal/handler/rendernotifyhandler.go index b51fee32..005ee044 100644 --- a/server/websocket/internal/handler/rendernotifyhandler.go +++ b/server/websocket/internal/handler/rendernotifyhandler.go @@ -63,7 +63,7 @@ func RenderNotifyHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { if !ok { return false } - ws.setOutRenderImage(req, ws) + ws.setOutRenderImage(req) return true }) httpx.OkJsonCtx(r.Context(), w, basic.Response{ From 59edb394fa6437e8502cfb48a1905a0d0098fc8a Mon Sep 17 00:00:00 2001 From: laodaming <11058467+laudamine@user.noreply.gitee.com> Date: Tue, 25 Jul 2023 19:37:56 +0800 Subject: [PATCH 11/14] fix --- server/websocket/internal/handler/datatransferhandler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/websocket/internal/handler/datatransferhandler.go b/server/websocket/internal/handler/datatransferhandler.go index 7c8a6c5b..050f84ad 100644 --- a/server/websocket/internal/handler/datatransferhandler.go +++ b/server/websocket/internal/handler/datatransferhandler.go @@ -119,6 +119,7 @@ func (w *wsConnectItem) heartbeat() { T: constants.WEBSOCKET_HEARTBEAT, D: "", } + b, _ := json.Marshal(rsp) for { time.Sleep(time.Second * 10) select { @@ -128,7 +129,6 @@ func (w *wsConnectItem) heartbeat() { } //发送心跳信息 - b, _ := json.Marshal(rsp) if err := w.conn.WriteMessage(websocket.TextMessage, b); err != nil { logx.Error("发送心跳信息异常,关闭连接:", w.flag, err) w.close() From 0657307265b8b688293772e2dfe5f0f0df9a40a0 Mon Sep 17 00:00:00 2001 From: laodaming <11058467+laudamine@user.noreply.gitee.com> Date: Tue, 25 Jul 2023 19:43:28 +0800 Subject: [PATCH 12/14] fix --- .../internal/handler/datatransferhandler.go | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/server/websocket/internal/handler/datatransferhandler.go b/server/websocket/internal/handler/datatransferhandler.go index 050f84ad..66cde398 100644 --- a/server/websocket/internal/handler/datatransferhandler.go +++ b/server/websocket/internal/handler/datatransferhandler.go @@ -7,6 +7,7 @@ import ( "fusenapi/server/websocket/internal/logic" "fusenapi/server/websocket/internal/svc" "fusenapi/server/websocket/internal/types" + "fusenapi/utils/auth" "github.com/google/uuid" "github.com/gorilla/websocket" "github.com/zeromicro/go-zero/core/logx" @@ -54,11 +55,11 @@ func DataTransferHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { defer conn.Close() rsp := types.DataTransferRsp{} // 解析JWT token,并对空用户进行判断 - /*claims, err := svcCtx.ParseJwtToken(r) + claims, err := svcCtx.ParseJwtToken(r) // 如果解析JWT token出错,则返回未授权的JSON响应并记录错误消息 if err != nil { - rsp.MsgType = constants.WEBSOCKET_UNAUTH - rsp.Message = "unAuth" + rsp.T = constants.WEBSOCKET_UNAUTH + rsp.D = "unAuth" b, _ := json.Marshal(rsp) _ = conn.WriteMessage(websocket.TextMessage, b) return @@ -68,20 +69,20 @@ func DataTransferHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { _, err = auth.GetUserInfoFormMapClaims(claims) // 如果获取用户信息出错,则返回未授权的JSON响应并记录错误消息 if err != nil { - rsp.MsgType = constants.WEBSOCKET_UNAUTH - rsp.Message = "unAuth!!" + rsp.T = constants.WEBSOCKET_UNAUTH + rsp.D = "unAuth!!" b, _ := json.Marshal(rsp) _ = conn.WriteMessage(websocket.TextMessage, b) return } } else { // 如果claims为nil,则认为用户身份为白板用户 - rsp.MsgType = constants.WEBSOCKET_UNAUTH - rsp.Message = "unAuth!!!" + rsp.T = constants.WEBSOCKET_UNAUTH + rsp.D = "unAuth!!!" b, _ := json.Marshal(rsp) _ = conn.WriteMessage(websocket.TextMessage, b) return - }*/ + } //生成连接唯一标识 flag := uuid.New().String() + time.Now().Format("20060102150405") ws := wsConnectItem{ From c378c6d8daab76bbbc40b55798f08864fc218f56 Mon Sep 17 00:00:00 2001 From: laodaming <11058467+laudamine@user.noreply.gitee.com> Date: Wed, 26 Jul 2023 10:03:15 +0800 Subject: [PATCH 13/14] fix --- .../internal/handler/datatransferhandler.go | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/server/websocket/internal/handler/datatransferhandler.go b/server/websocket/internal/handler/datatransferhandler.go index 66cde398..3e5177d7 100644 --- a/server/websocket/internal/handler/datatransferhandler.go +++ b/server/websocket/internal/handler/datatransferhandler.go @@ -54,6 +54,7 @@ func DataTransferHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { } defer conn.Close() rsp := types.DataTransferRsp{} + isAuth := true // 解析JWT token,并对空用户进行判断 claims, err := svcCtx.ParseJwtToken(r) // 如果解析JWT token出错,则返回未授权的JSON响应并记录错误消息 @@ -62,7 +63,7 @@ func DataTransferHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { rsp.D = "unAuth" b, _ := json.Marshal(rsp) _ = conn.WriteMessage(websocket.TextMessage, b) - return + isAuth = false } if claims != nil { // 从token中获取对应的用户信息 @@ -73,7 +74,7 @@ func DataTransferHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { rsp.D = "unAuth!!" b, _ := json.Marshal(rsp) _ = conn.WriteMessage(websocket.TextMessage, b) - return + isAuth = false } } else { // 如果claims为nil,则认为用户身份为白板用户 @@ -81,7 +82,15 @@ func DataTransferHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { rsp.D = "unAuth!!!" b, _ := json.Marshal(rsp) _ = conn.WriteMessage(websocket.TextMessage, b) - return + isAuth = false + } + //不是授权的连接(10秒后关闭) + if !isAuth { + select { + case <-time.After(time.Second * 10): + conn.Close() + return + } } //生成连接唯一标识 flag := uuid.New().String() + time.Now().Format("20060102150405") From c7536d0fdb13ed22d4ed0e26a9b01b9fdf970242 Mon Sep 17 00:00:00 2001 From: laodaming <11058467+laudamine@user.noreply.gitee.com> Date: Wed, 26 Jul 2023 11:02:30 +0800 Subject: [PATCH 14/14] fix --- fs_gen_api_backend.sh | 2 +- model/gmodel/fs_product_template_tags_gen.go | 1 + .../gmodel/fs_product_template_tags_logic.go | 9 ++ .../etc/product-template-tag.yaml | 8 ++ .../internal/config/config.go | 12 +++ .../handler/getproducttemplatetagshandler.go | 35 ++++++++ .../internal/handler/routes.go | 22 +++++ .../logic/getproducttemplatetagslogic.go | 55 ++++++++++++ .../internal/svc/servicecontext.go | 61 ++++++++++++++ .../internal/types/types.go | 84 +++++++++++++++++++ .../product-template-tag.go | 36 ++++++++ server_api/product-template-tag.api | 25 ++++++ 12 files changed, 349 insertions(+), 1 deletion(-) create mode 100644 server/product-template-tag/etc/product-template-tag.yaml create mode 100644 server/product-template-tag/internal/config/config.go create mode 100644 server/product-template-tag/internal/handler/getproducttemplatetagshandler.go create mode 100644 server/product-template-tag/internal/handler/routes.go create mode 100644 server/product-template-tag/internal/logic/getproducttemplatetagslogic.go create mode 100644 server/product-template-tag/internal/svc/servicecontext.go create mode 100644 server/product-template-tag/internal/types/types.go create mode 100644 server/product-template-tag/product-template-tag.go create mode 100644 server_api/product-template-tag.api diff --git a/fs_gen_api_backend.sh b/fs_gen_api_backend.sh index 1acbf5fa..d870c95d 100755 --- a/fs_gen_api_backend.sh +++ b/fs_gen_api_backend.sh @@ -1,6 +1,6 @@ #! /bin/bash name=${1%%\\*} -options=("backend" "product-template" "product-model") +options=("backend" "product-template" "product-template-tag" "product-model") for option in "${options[@]}"; do if [ "$name" = "$option" ]; then echo $name diff --git a/model/gmodel/fs_product_template_tags_gen.go b/model/gmodel/fs_product_template_tags_gen.go index db246f6e..227af16d 100644 --- a/model/gmodel/fs_product_template_tags_gen.go +++ b/model/gmodel/fs_product_template_tags_gen.go @@ -8,6 +8,7 @@ import ( type FsProductTemplateTags struct { Id int64 `gorm:"primary_key;default:0;auto_increment;" json:"id"` // ID Title *string `gorm:"default:'';" json:"title"` // 标题 + CoverImg *string `gorm:"default:'';" json:"cover_img"` // 封面图 Status *int64 `gorm:"default:0;" json:"status"` // 状态 1:可用 CreateAt *int64 `gorm:"default:0;" json:"create_at"` // 创建时间 } diff --git a/model/gmodel/fs_product_template_tags_logic.go b/model/gmodel/fs_product_template_tags_logic.go index f5c87b39..4c26d2fb 100755 --- a/model/gmodel/fs_product_template_tags_logic.go +++ b/model/gmodel/fs_product_template_tags_logic.go @@ -23,3 +23,12 @@ func (pt *FsProductTemplateTagsModel) FindOne(ctx context.Context, id int64, fie err = db.Take(&resp).Error return resp, err } +func (pt *FsProductTemplateTagsModel) GetList(ctx context.Context, page, limit int, orderBy string) (resp []FsProductTemplateTags, err error) { + db := pt.db.WithContext(ctx).Model(&FsProductTemplateTags{}).Where("`status` = ?", 1) + if orderBy != "" { + db = db.Order(orderBy) + } + offset := (page - 1) * limit + err = db.Offset(offset).Limit(limit).Find(&resp).Error + return resp, err +} diff --git a/server/product-template-tag/etc/product-template-tag.yaml b/server/product-template-tag/etc/product-template-tag.yaml new file mode 100644 index 00000000..253828fb --- /dev/null +++ b/server/product-template-tag/etc/product-template-tag.yaml @@ -0,0 +1,8 @@ +Name: product-template-tag +Host: 0.0.0.0 +Port: 9916 +SourceMysql: fusentest:XErSYmLELKMnf3Dh@tcp(110.41.19.98:3306)/fusentest +Auth: + AccessSecret: fusen2023 + AccessExpire: 2592000 + RefreshAfter: 1592000 \ No newline at end of file diff --git a/server/product-template-tag/internal/config/config.go b/server/product-template-tag/internal/config/config.go new file mode 100644 index 00000000..b5a0124e --- /dev/null +++ b/server/product-template-tag/internal/config/config.go @@ -0,0 +1,12 @@ +package config + +import ( + "fusenapi/server/product-template-tag/internal/types" + "github.com/zeromicro/go-zero/rest" +) + +type Config struct { + rest.RestConf + SourceMysql string + Auth types.Auth +} diff --git a/server/product-template-tag/internal/handler/getproducttemplatetagshandler.go b/server/product-template-tag/internal/handler/getproducttemplatetagshandler.go new file mode 100644 index 00000000..cf7cf96e --- /dev/null +++ b/server/product-template-tag/internal/handler/getproducttemplatetagshandler.go @@ -0,0 +1,35 @@ +package handler + +import ( + "net/http" + "reflect" + + "fusenapi/utils/basic" + + "fusenapi/server/product-template-tag/internal/logic" + "fusenapi/server/product-template-tag/internal/svc" + "fusenapi/server/product-template-tag/internal/types" +) + +func GetProductTemplateTagsHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + + var req types.GetProductTemplateTagsReq + userinfo, err := basic.RequestParse(w, r, svcCtx, &req) + if err != nil { + return + } + + // 创建一个业务逻辑层实例 + l := logic.NewGetProductTemplateTagsLogic(r.Context(), svcCtx) + + rl := reflect.ValueOf(l) + basic.BeforeLogic(w, r, rl) + + resp := l.GetProductTemplateTags(&req, userinfo) + + if !basic.AfterLogic(w, r, rl, resp) { + basic.NormalAfterLogic(w, r, resp) + } + } +} diff --git a/server/product-template-tag/internal/handler/routes.go b/server/product-template-tag/internal/handler/routes.go new file mode 100644 index 00000000..b2c8f181 --- /dev/null +++ b/server/product-template-tag/internal/handler/routes.go @@ -0,0 +1,22 @@ +// Code generated by goctl. DO NOT EDIT. +package handler + +import ( + "net/http" + + "fusenapi/server/product-template-tag/internal/svc" + + "github.com/zeromicro/go-zero/rest" +) + +func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) { + server.AddRoutes( + []rest.Route{ + { + Method: http.MethodGet, + Path: "/api/product-template/get_product_template_tags", + Handler: GetProductTemplateTagsHandler(serverCtx), + }, + }, + ) +} diff --git a/server/product-template-tag/internal/logic/getproducttemplatetagslogic.go b/server/product-template-tag/internal/logic/getproducttemplatetagslogic.go new file mode 100644 index 00000000..68c62363 --- /dev/null +++ b/server/product-template-tag/internal/logic/getproducttemplatetagslogic.go @@ -0,0 +1,55 @@ +package logic + +import ( + "fusenapi/utils/auth" + "fusenapi/utils/basic" + + "context" + + "fusenapi/server/product-template-tag/internal/svc" + "fusenapi/server/product-template-tag/internal/types" + + "github.com/zeromicro/go-zero/core/logx" +) + +type GetProductTemplateTagsLogic struct { + logx.Logger + ctx context.Context + svcCtx *svc.ServiceContext +} + +func NewGetProductTemplateTagsLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetProductTemplateTagsLogic { + return &GetProductTemplateTagsLogic{ + Logger: logx.WithContext(ctx), + ctx: ctx, + svcCtx: svcCtx, + } +} + +// 处理进入前逻辑w,r +// func (l *GetProductTemplateTagsLogic) BeforeLogic(w http.ResponseWriter, r *http.Request) { +// } + +// 处理逻辑后 w,r 如:重定向, resp 必须重新处理 +// func (l *GetProductTemplateTagsLogic) AfterLogic(w http.ResponseWriter, r *http.Request, resp *basic.Response) { +// // httpx.OkJsonCtx(r.Context(), w, resp) +// } + +func (l *GetProductTemplateTagsLogic) GetProductTemplateTags(req *types.GetProductTemplateTagsReq, userinfo *auth.UserInfo) (resp *basic.Response) { + if req.Limit <= 0 || req.Limit > 100 { + req.Limit = 4 + } + productTemplateTags, err := l.svcCtx.AllModels.FsProductTemplateTags.GetList(l.ctx, 1, req.Limit, "`id` DESC") + if err != nil { + logx.Error(err) + return resp.SetStatusWithMessage(basic.CodeDbSqlErr, "failed to get template tags") + } + list := make([]types.GetProductTemplateTagsRsp, 0, len(productTemplateTags)) + for _, v := range productTemplateTags { + list = append(list, types.GetProductTemplateTagsRsp{ + Tag: *v.Title, + Cover: *v.CoverImg, + }) + } + return resp.SetStatusWithMessage(basic.CodeOK, "success", list) +} diff --git a/server/product-template-tag/internal/svc/servicecontext.go b/server/product-template-tag/internal/svc/servicecontext.go new file mode 100644 index 00000000..4ebf8b31 --- /dev/null +++ b/server/product-template-tag/internal/svc/servicecontext.go @@ -0,0 +1,61 @@ +package svc + +import ( + "errors" + "fmt" + "fusenapi/server/product-template-tag/internal/config" + "net/http" + + "fusenapi/initalize" + "fusenapi/model/gmodel" + + "github.com/golang-jwt/jwt" + "gorm.io/gorm" +) + +type ServiceContext struct { + Config config.Config + + MysqlConn *gorm.DB + AllModels *gmodel.AllModelsGen +} + +func NewServiceContext(c config.Config) *ServiceContext { + + return &ServiceContext{ + Config: c, + MysqlConn: initalize.InitMysql(c.SourceMysql), + AllModels: gmodel.NewAllModels(initalize.InitMysql(c.SourceMysql)), + } +} + +func (svcCtx *ServiceContext) ParseJwtToken(r *http.Request) (jwt.MapClaims, error) { + AuthKey := r.Header.Get("Authorization") + if AuthKey == "" { + return nil, nil + } + AuthKey = AuthKey[7:] + + if len(AuthKey) <= 50 { + return nil, errors.New(fmt.Sprint("Error parsing token, len:", len(AuthKey))) + } + + token, err := jwt.Parse(AuthKey, func(token *jwt.Token) (interface{}, error) { + // 检查签名方法是否为 HS256 + if _, ok := token.Method.(*jwt.SigningMethodHMAC); !ok { + return nil, fmt.Errorf("unexpected signing method: %v", token.Header["alg"]) + } + // 返回用于验证签名的密钥 + return []byte(svcCtx.Config.Auth.AccessSecret), nil + }) + if err != nil { + return nil, errors.New(fmt.Sprint("Error parsing token:", err)) + } + + // 验证成功返回 + if claims, ok := token.Claims.(jwt.MapClaims); ok && token.Valid { + return claims, nil + } + + return nil, errors.New(fmt.Sprint("Invalid token", err)) +} diff --git a/server/product-template-tag/internal/types/types.go b/server/product-template-tag/internal/types/types.go new file mode 100644 index 00000000..39866da6 --- /dev/null +++ b/server/product-template-tag/internal/types/types.go @@ -0,0 +1,84 @@ +// Code generated by goctl. DO NOT EDIT. +package types + +import ( + "fusenapi/utils/basic" +) + +type GetProductTemplateTagsReq struct { + Limit int `form:"limit"` +} + +type GetProductTemplateTagsRsp struct { + Tag string `json:"tag"` + Cover string `json:"cover"` +} + +type Request struct { +} + +type Response struct { + Code int `json:"code"` + Message string `json:"msg"` + Data interface{} `json:"data"` +} + +type Auth struct { + AccessSecret string `json:"accessSecret"` + AccessExpire int64 `json:"accessExpire"` + RefreshAfter int64 `json:"refreshAfter"` +} + +type File struct { + Filename string `fsfile:"filename"` + Header map[string][]string `fsfile:"header"` + Size int64 `fsfile:"size"` + Data []byte `fsfile:"data"` +} + +type Meta struct { + TotalCount int64 `json:"totalCount"` + PageCount int64 `json:"pageCount"` + CurrentPage int `json:"currentPage"` + PerPage int `json:"perPage"` +} + +// Set 设置Response的Code和Message值 +func (resp *Response) Set(Code int, Message string) *Response { + return &Response{ + Code: Code, + Message: Message, + } +} + +// Set 设置整个Response +func (resp *Response) SetWithData(Code int, Message string, Data interface{}) *Response { + return &Response{ + Code: Code, + Message: Message, + Data: Data, + } +} + +// SetStatus 设置默认StatusResponse(内部自定义) 默认msg, 可以带data, data只使用一个参数 +func (resp *Response) SetStatus(sr *basic.StatusResponse, data ...interface{}) *Response { + newResp := &Response{ + Code: sr.Code, + } + if len(data) == 1 { + newResp.Data = data[0] + } + return newResp +} + +// SetStatusWithMessage 设置默认StatusResponse(内部自定义) 非默认msg, 可以带data, data只使用一个参数 +func (resp *Response) SetStatusWithMessage(sr *basic.StatusResponse, msg string, data ...interface{}) *Response { + newResp := &Response{ + Code: sr.Code, + Message: msg, + } + if len(data) == 1 { + newResp.Data = data[0] + } + return newResp +} diff --git a/server/product-template-tag/product-template-tag.go b/server/product-template-tag/product-template-tag.go new file mode 100644 index 00000000..18898784 --- /dev/null +++ b/server/product-template-tag/product-template-tag.go @@ -0,0 +1,36 @@ +package main + +import ( + "flag" + "fmt" + "net/http" + "time" + + "fusenapi/utils/auth" + + "fusenapi/server/product-template-tag/internal/config" + "fusenapi/server/product-template-tag/internal/handler" + "fusenapi/server/product-template-tag/internal/svc" + + "github.com/zeromicro/go-zero/core/conf" + "github.com/zeromicro/go-zero/rest" +) + +var configFile = flag.String("f", "etc/product-template-tag.yaml", "the config file") + +func main() { + flag.Parse() + + var c config.Config + conf.MustLoad(*configFile, &c) + c.Timeout = int64(time.Second * 15) + server := rest.MustNewServer(c.RestConf, rest.WithCustomCors(auth.FsCors, func(w http.ResponseWriter) { + })) + defer server.Stop() + + ctx := svc.NewServiceContext(c) + handler.RegisterHandlers(server, ctx) + + fmt.Printf("Starting server at %s:%d...\n", c.Host, c.Port) + server.Start() +} diff --git a/server_api/product-template-tag.api b/server_api/product-template-tag.api new file mode 100644 index 00000000..ae62099a --- /dev/null +++ b/server_api/product-template-tag.api @@ -0,0 +1,25 @@ +syntax = "v1" + +info ( + title: "产品模板标签服务"// TODO: add title + desc: // TODO: add description + author: "" + email: "" +) + +import "basic.api" + +service product-template-tag { + //获取产品模板标签列表 + @handler GetProductTemplateTagsHandler + get /api/product-template/get_product_template_tags(GetProductTemplateTagsReq) returns (response); +} + +//获取产品模板标签列表 +type GetProductTemplateTagsReq { + Limit int `form:"limit"` +} +type GetProductTemplateTagsRsp { + Tag string `json:"tag"` + Cover string `json:"cover"` +} \ No newline at end of file