diff --git a/server/websocket/internal/handler/datatransferhandler.go b/server/websocket/internal/handler/datatransferhandler.go index 96a65455..e07d3abe 100644 --- a/server/websocket/internal/handler/datatransferhandler.go +++ b/server/websocket/internal/handler/datatransferhandler.go @@ -4,9 +4,9 @@ import ( "encoding/json" "fmt" "fusenapi/constants" + "fusenapi/server/websocket/internal/logic" "fusenapi/server/websocket/internal/svc" "fusenapi/server/websocket/internal/types" - "fusenapi/utils/auth" "github.com/google/uuid" "github.com/gorilla/websocket" "github.com/zeromicro/go-zero/core/logx" @@ -24,21 +24,27 @@ var ( }, } //连接map池 - mapConn = sync.Map{} + mapConnPool = sync.Map{} ) // 每个连接的连接属性 type wsConnectItem struct { - conn *websocket.Conn //websocket的连接 - closeChan chan struct{} //关闭chan - isClose bool //是否已经关闭 - flag string - inChan chan interface{} //接受消息缓冲通道 - property interface{} //属性 + conn *websocket.Conn //websocket的连接 + closeChan chan struct{} //关闭chan + isClose bool //是否已经关闭 + flag string + inChan chan []byte //接受消息缓冲通道 + outChan chan []byte //发送回客户端的消息 + mutex sync.Mutex + renderImage map[string]struct{} //需要渲染的图片 } func DataTransferHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { + // 创建一个业务逻辑层实例 + var req types.DataTransferReq + l := logic.NewDataTransferLogic(r.Context(), svcCtx) + l.DataTransfer(&req, nil) //升级websocket conn, err := upgrade.Upgrade(w, r, nil) if err != nil { @@ -48,7 +54,7 @@ func DataTransferHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { defer conn.Close() rsp := types.DataTransferRsp{} // 解析JWT token,并对空用户进行判断 - claims, err := svcCtx.ParseJwtToken(r) + /*claims, err := svcCtx.ParseJwtToken(r) // 如果解析JWT token出错,则返回未授权的JSON响应并记录错误消息 if err != nil { rsp.MsgType = constants.WEBSOCKET_UNAUTH @@ -75,18 +81,20 @@ func DataTransferHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { b, _ := json.Marshal(rsp) _ = conn.WriteMessage(websocket.TextMessage, b) return - } + }*/ //生成连接唯一标识 flag := uuid.New().String() + time.Now().Format("20060102150405") ws := wsConnectItem{ - conn: conn, - flag: flag, - closeChan: make(chan struct{}, 1), - inChan: make(chan interface{}, 1000), - isClose: false, + conn: conn, + flag: flag, + closeChan: make(chan struct{}, 1), + inChan: make(chan []byte, 100), + outChan: make(chan []byte, 100), + renderImage: make(map[string]struct{}), + isClose: false, } //保存连接 - mapConn.Store(flag, ws) + mapConnPool.Store(flag, ws) defer ws.close() //把连接成功消息发回去 rsp.MsgType = constants.WEBSOCKET_CONNECT_SUCCESS @@ -95,43 +103,66 @@ func DataTransferHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { conn.WriteMessage(websocket.TextMessage, b) //循环读客户端信息 go ws.readLoop() + //循环把数据发送给客户端 + go ws.writeLoop() //推消息到云渲染 go ws.sendLoop() //心跳 - for { - time.Sleep(time.Second * 3) - select { - case <-ws.closeChan: + ws.heartbeat() + + } +} + +// 心跳 +func (w *wsConnectItem) heartbeat() { + rsp := types.DataTransferRsp{ + MsgType: constants.WEBSOCKET_HEARTBEAT, + Message: "", + } + for { + time.Sleep(time.Second * 10) + select { + case <-w.closeChan: + return + default: + //发送心跳信息 + b, _ := json.Marshal(rsp) + if err := w.conn.WriteMessage(websocket.TextMessage, b); err != nil { + logx.Error("发送心跳信息异常,关闭连接:", w.flag, err) + w.close() return - default: - //发送心跳信息 - rsp.MsgType = constants.WEBSOCKET_HEARTBEAT - rsp.Message = "heartbeat" - b, _ = json.Marshal(rsp) - if err = conn.WriteMessage(websocket.TextMessage, b); err != nil { - logx.Error("发送心跳信息异常,关闭连接:", flag, err) - ws.close() - return - } } } - } } // 关闭连接 func (w *wsConnectItem) close() { + w.mutex.Lock() + defer w.mutex.Unlock() logx.Info("websocket:", w.flag, " is closing...") w.conn.Close() - mapConn.Delete(w.flag) + mapConnPool.Delete(w.flag) if !w.isClose { w.isClose = true close(w.closeChan) + close(w.outChan) close(w.inChan) } logx.Info("websocket:", w.flag, " is closed") } +func (w *wsConnectItem) writeLoop() { + for { + select { + case <-w.closeChan: //如果关闭了 + return + case data := <-w.outChan: + w.conn.WriteMessage(websocket.TextMessage, data) + } + } +} + // 接受客户端发来的消息 func (w *wsConnectItem) readLoop() { for { @@ -159,7 +190,39 @@ func (w *wsConnectItem) sendLoop() { case <-w.closeChan: return case data := <-w.inChan: - fmt.Println(data) + w.dealwithReciveData(data) } } } + +// 处理接受到的数据 +func (w *wsConnectItem) dealwithReciveData(data []byte) { + var parseInfo types.DataTransferReq + if err := json.Unmarshal(data, &parseInfo); err != nil { + logx.Error("invalid format of websocket message") + return + } + switch parseInfo.MsgType { + case constants.WEBSOCKET_RENDER_IMAGE: //图片渲染 + var renderImageData []types.RenderImageReqMsg + if err := json.Unmarshal([]byte(parseInfo.Message), &renderImageData); err != nil { + logx.Error("invalid format of websocket render image message", err) + return + } + logx.Info("收到请求云渲染图片数据:", renderImageData) + w.mutex.Lock() + defer w.mutex.Unlock() + //把需要渲染的图片加进去 + for _, v := range renderImageData { + key := getRenderImageMapKey(v.ProductId, v.SizeId, v.TemplateId) + w.renderImage[key] = struct{}{} + } + default: + + } +} + +// 获取需要渲染图片的map key +func getRenderImageMapKey(productId, sizeId, templateId int64) string { + return fmt.Sprintf("%d-%d-%d", productId, sizeId, templateId) +} diff --git a/server/websocket/internal/handler/rendernotifyhandler.go b/server/websocket/internal/handler/rendernotifyhandler.go new file mode 100644 index 00000000..9c2eb28d --- /dev/null +++ b/server/websocket/internal/handler/rendernotifyhandler.go @@ -0,0 +1,62 @@ +package handler + +import ( + "encoding/json" + "fusenapi/utils/basic" + "github.com/zeromicro/go-zero/core/logx" + "github.com/zeromicro/go-zero/rest/httpx" + "net/http" + + "fusenapi/server/websocket/internal/svc" + "fusenapi/server/websocket/internal/types" +) + +func RenderNotifyHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + var req types.RenderNotifyReq + _, err := basic.RequestParse(w, r, svcCtx, &req) + if err != nil { + return + } + mapConnPool.Range(func(key, value any) bool { + ws, ok := value.(wsConnectItem) + if !ok { + return false + } + setOutRenderImage(req, ws) + return true + }) + httpx.OkJsonCtx(r.Context(), w, basic.Response{ + Code: 200, + Message: "success", + Data: nil, + }) + } +} + +// 把渲染好的数据放入outchan +func setOutRenderImage(req types.RenderNotifyReq, ws wsConnectItem) { + ws.mutex.Lock() + defer ws.mutex.Unlock() + for _, notifyItem := range req.NotifyList { + renderKey := getRenderImageMapKey(notifyItem.ProductId, notifyItem.SizeId, notifyItem.TemplateId) + //加载并删除 + _, ok := ws.renderImage[renderKey] + if !ok { + continue + } + responseData := types.RenderImageRspMsg{ + ProductId: notifyItem.ProductId, + SizeId: notifyItem.SizeId, + TemplateId: notifyItem.TemplateId, + Source: "我是渲染资源", + } + b, _ := json.Marshal(responseData) + select { + case <-ws.closeChan: + return + case ws.outChan <- b: + logx.Info("notify send render result to out chan") + } + } +} diff --git a/server/websocket/internal/handler/routes.go b/server/websocket/internal/handler/routes.go index fd3200e4..859ecc5b 100644 --- a/server/websocket/internal/handler/routes.go +++ b/server/websocket/internal/handler/routes.go @@ -17,6 +17,11 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) { Path: "/api/websocket/data_transfer", Handler: DataTransferHandler(serverCtx), }, + { + Method: http.MethodPost, + Path: "/api/websocket/render_notify", + Handler: RenderNotifyHandler(serverCtx), + }, }, ) } diff --git a/server/websocket/internal/logic/rendernotifylogic.go b/server/websocket/internal/logic/rendernotifylogic.go new file mode 100644 index 00000000..49803d29 --- /dev/null +++ b/server/websocket/internal/logic/rendernotifylogic.go @@ -0,0 +1,43 @@ +package logic + +import ( + "fusenapi/utils/auth" + "fusenapi/utils/basic" + + "context" + + "fusenapi/server/websocket/internal/svc" + "fusenapi/server/websocket/internal/types" + + "github.com/zeromicro/go-zero/core/logx" +) + +type RenderNotifyLogic struct { + logx.Logger + ctx context.Context + svcCtx *svc.ServiceContext +} + +func NewRenderNotifyLogic(ctx context.Context, svcCtx *svc.ServiceContext) *RenderNotifyLogic { + return &RenderNotifyLogic{ + Logger: logx.WithContext(ctx), + ctx: ctx, + svcCtx: svcCtx, + } +} + +// 处理进入前逻辑w,r +// func (l *RenderNotifyLogic) BeforeLogic(w http.ResponseWriter, r *http.Request) { +// } + +// 处理逻辑后 w,r 如:重定向, resp 必须重新处理 +// func (l *RenderNotifyLogic) AfterLogic(w http.ResponseWriter, r *http.Request, resp *basic.Response) { +// // httpx.OkJsonCtx(r.Context(), w, resp) +// } + +func (l *RenderNotifyLogic) RenderNotify(req *types.RenderNotifyReq, userinfo *auth.UserInfo) (resp *basic.Response) { + // 返回值必须调用Set重新返回, resp可以空指针调用 resp.SetStatus(basic.CodeOK, data) + // userinfo 传入值时, 一定不为null + + return resp.SetStatus(basic.CodeOK) +} diff --git a/server/websocket/internal/types/types.go b/server/websocket/internal/types/types.go index 06201d39..35df171d 100644 --- a/server/websocket/internal/types/types.go +++ b/server/websocket/internal/types/types.go @@ -6,13 +6,37 @@ import ( ) type DataTransferReq struct { - MsgType string `json:"msg_type"` //消息类型 - Message interface{} `json:"message"` //传递的消息 + MsgType string `json:"msg_type"` //消息类型 + Message string `json:"message"` //传递的消息 } type DataTransferRsp struct { - MsgType string `json:"msg_type"` //消息类型 - Message interface{} `json:"message"` //传递的消息 + MsgType string `json:"msg_type"` //消息类型 + Message string `json:"message"` //传递的消息 +} + +type RenderImageReqMsg struct { + ProductId int64 `json:"product_id"` + SizeId int64 `json:"size_id"` + TemplateId int64 `json:"template_id"` +} + +type RenderImageRspMsg struct { + ProductId int64 `json:"product_id"` + SizeId int64 `json:"size_id"` + TemplateId int64 `json:"template_id"` + Source string `json:"source"` +} + +type RenderNotifyReq struct { + NotifyList []NotifyItem `json:"notify_list"` +} + +type NotifyItem struct { + ProductId int64 `json:"product_id"` + SizeId int64 `json:"size_id"` + TemplateId int64 `json:"template_id"` + Source string `json:"source"` } type Request struct { diff --git a/server_api/websocket.api b/server_api/websocket.api index 786e67e7..ee90cdcc 100644 --- a/server_api/websocket.api +++ b/server_api/websocket.api @@ -12,14 +12,38 @@ service websocket { //websocket数据交互 @handler DataTransferHandler get /api/websocket/data_transfer(DataTransferReq) returns (response); + //渲染完了通知接口 + @handler RenderNotifyHandler + post /api/websocket/render_notify(RenderNotifyReq) returns (response); } //websocket数据交互 type DataTransferReq { - MsgType string `json:"msg_type"` //消息类型 - Message interface{} `json:"message"` //传递的消息 + MsgType string `json:"msg_type"` //消息类型 + Message string `json:"message"` //传递的消息 } type DataTransferRsp { - MsgType string `json:"msg_type"` //消息类型 - Message interface{} `json:"message"` //传递的消息 + MsgType string `json:"msg_type"` //消息类型 + Message string `json:"message"` //传递的消息 +} +type RenderImageReqMsg { //websocket接受需要云渲染的图片 + ProductId int64 `json:"product_id"` + SizeId int64 `json:"size_id"` + TemplateId int64 `json:"template_id"` +} +type RenderImageRspMsg { //websocket发送渲染完的数据 + ProductId int64 `json:"product_id"` + SizeId int64 `json:"size_id"` + TemplateId int64 `json:"template_id"` + Source string `json:"source"` +} +//渲染完了通知接口 +type RenderNotifyReq { + NotifyList []NotifyItem `json:"notify_list"` +} +type NotifyItem { + ProductId int64 `json:"product_id"` + SizeId int64 `json:"size_id"` + TemplateId int64 `json:"template_id"` + Source string `json:"source"` } \ No newline at end of file