package handler import ( "encoding/json" "fmt" "fusenapi/constants" "fusenapi/server/websocket/internal/svc" "fusenapi/server/websocket/internal/types" "fusenapi/utils/auth" "github.com/google/uuid" "github.com/gorilla/websocket" "github.com/zeromicro/go-zero/core/logx" "net/http" "sync" "time" ) var ( //升级 upgrade = websocket.Upgrader{ //允许跨域 CheckOrigin: func(r *http.Request) bool { return true }, } //连接map池 mapConn = sync.Map{} ) // 每个连接的连接属性 type wsConnectItem struct { conn *websocket.Conn //websocket的连接 closeChan chan struct{} //关闭chan isClose bool //是否已经关闭 flag string inChan chan interface{} //接受消息缓冲通道 property interface{} //属性 } func DataTransferHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { //升级websocket conn, err := upgrade.Upgrade(w, r, nil) if err != nil { logx.Error("http upgrade websocket err:", err) return } defer conn.Close() rsp := types.DataTransferRsp{} // 解析JWT token,并对空用户进行判断 claims, err := svcCtx.ParseJwtToken(r) // 如果解析JWT token出错,则返回未授权的JSON响应并记录错误消息 if err != nil { rsp.MsgType = constants.WEBSOCKET_UNAUTH rsp.Message = "unAuth" b, _ := json.Marshal(rsp) _ = conn.WriteMessage(websocket.TextMessage, b) return } if claims != nil { // 从token中获取对应的用户信息 _, err = auth.GetUserInfoFormMapClaims(claims) // 如果获取用户信息出错,则返回未授权的JSON响应并记录错误消息 if err != nil { rsp.MsgType = constants.WEBSOCKET_UNAUTH rsp.Message = "unAuth!!" b, _ := json.Marshal(rsp) _ = conn.WriteMessage(websocket.TextMessage, b) return } } else { // 如果claims为nil,则认为用户身份为白板用户 rsp.MsgType = constants.WEBSOCKET_UNAUTH rsp.Message = "unAuth!!!" b, _ := json.Marshal(rsp) _ = conn.WriteMessage(websocket.TextMessage, b) return } //生成连接唯一标识 flag := uuid.New().String() + time.Now().Format("20060102150405") ws := wsConnectItem{ conn: conn, flag: flag, closeChan: make(chan struct{}, 1), inChan: make(chan interface{}, 1000), isClose: false, } //保存连接 mapConn.Store(flag, ws) defer ws.close() //把连接成功消息发回去 rsp.MsgType = constants.WEBSOCKET_CONNECT_SUCCESS rsp.Message = flag b, _ := json.Marshal(rsp) conn.WriteMessage(websocket.TextMessage, b) //循环读客户端信息 go ws.readLoop() //推消息到云渲染 go ws.sendLoop() //心跳 for { time.Sleep(time.Second * 3) select { case <-ws.closeChan: return default: //发送心跳信息 rsp.MsgType = constants.WEBSOCKET_HEARTBEAT rsp.Message = "heartbeat" b, _ = json.Marshal(rsp) if err = conn.WriteMessage(websocket.TextMessage, b); err != nil { logx.Error("发送心跳信息异常,关闭连接:", flag, err) ws.close() return } } } } } // 关闭连接 func (w *wsConnectItem) close() { logx.Info("websocket:", w.flag, " is closing...") w.conn.Close() mapConn.Delete(w.flag) if !w.isClose { w.isClose = true close(w.closeChan) close(w.inChan) } logx.Info("websocket:", w.flag, " is closed") } // 接受客户端发来的消息 func (w *wsConnectItem) readLoop() { for { select { case <-w.closeChan: //如果关闭了 return default: _, data, err := w.conn.ReadMessage() if err != nil { logx.Error("接受信息错误:", err) //关闭连接 w.close() return } //消息传入缓冲通道 w.inChan <- data } } } // 把收到的消息发往不同的地方处理 func (w *wsConnectItem) sendLoop() { for { select { case <-w.closeChan: return case data := <-w.inChan: fmt.Println(data) } } }