diff --git a/constants/websocket.go b/constants/websocket.go index 95bea53d..7fca9751 100644 --- a/constants/websocket.go +++ b/constants/websocket.go @@ -4,8 +4,12 @@ type websocket string // websocket消息类型 const ( + //鉴权失败 + WEBSOCKET_UNAUTH = "unAuth" //ws连接成功 WEBSOCKET_CONNECT_SUCCESS = "connect-success" + //心跳信息 + WEBSOCKET_HEARTBEAT = "heartbeat" //图片渲染 WEBSOCKET_RENDER_IMAGE = "render-image" ) diff --git a/server/websocket/etc/websocket.yaml b/server/websocket/etc/websocket.yaml index d22f03a7..7d2b68d7 100644 --- a/server/websocket/etc/websocket.yaml +++ b/server/websocket/etc/websocket.yaml @@ -1,6 +1,6 @@ Name: websocket Host: 0.0.0.0 -Port: 8888 +Port: 9914 SourceMysql: fusentest:XErSYmLELKMnf3Dh@tcp(110.41.19.98:3306)/fusentest Auth: AccessSecret: fusen2023 diff --git a/server/websocket/internal/config/config.go b/server/websocket/internal/config/config.go index b24bb7d7..85ee62f1 100644 --- a/server/websocket/internal/config/config.go +++ b/server/websocket/internal/config/config.go @@ -1,6 +1,9 @@ package config -import "github.com/zeromicro/go-zero/rest" +import ( + "fusenapi/server/websocket/internal/types" + "github.com/zeromicro/go-zero/rest" +) type Config struct { rest.RestConf diff --git a/server/websocket/internal/handler/datatransferhandler.go b/server/websocket/internal/handler/datatransferhandler.go index e1458960..96a65455 100644 --- a/server/websocket/internal/handler/datatransferhandler.go +++ b/server/websocket/internal/handler/datatransferhandler.go @@ -2,6 +2,7 @@ package handler import ( "encoding/json" + "fmt" "fusenapi/constants" "fusenapi/server/websocket/internal/svc" "fusenapi/server/websocket/internal/types" @@ -28,18 +29,32 @@ var ( // 每个连接的连接属性 type wsConnectItem struct { - conn *websocket.Conn //websocket的连接 - property interface{} //属性 + conn *websocket.Conn //websocket的连接 + closeChan chan struct{} //关闭chan + isClose bool //是否已经关闭 + flag string + inChan chan interface{} //接受消息缓冲通道 + property interface{} //属性 } func DataTransferHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { + //升级websocket + conn, err := upgrade.Upgrade(w, r, nil) + if err != nil { + logx.Error("http upgrade websocket err:", err) + return + } + defer conn.Close() + rsp := types.DataTransferRsp{} // 解析JWT token,并对空用户进行判断 claims, err := svcCtx.ParseJwtToken(r) // 如果解析JWT token出错,则返回未授权的JSON响应并记录错误消息 if err != nil { - logx.Info("unauthorized:", err.Error()) // 记录错误日志 - w.Write([]byte("connect failed:unauthorized")) + rsp.MsgType = constants.WEBSOCKET_UNAUTH + rsp.Message = "unAuth" + b, _ := json.Marshal(rsp) + _ = conn.WriteMessage(websocket.TextMessage, b) return } if claims != nil { @@ -47,37 +62,104 @@ func DataTransferHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { _, err = auth.GetUserInfoFormMapClaims(claims) // 如果获取用户信息出错,则返回未授权的JSON响应并记录错误消息 if err != nil { + rsp.MsgType = constants.WEBSOCKET_UNAUTH + rsp.Message = "unAuth!!" + b, _ := json.Marshal(rsp) + _ = conn.WriteMessage(websocket.TextMessage, b) return } } else { // 如果claims为nil,则认为用户身份为白板用户 - w.Write([]byte("connect failed:unauthorized!!")) - return - } - //升级websocket - conn, err := upgrade.Upgrade(w, r, r.Header) - if err != nil { - logx.Error("http upgrade websocket err:", err) - w.Write([]byte("http upgrade websocket err")) + rsp.MsgType = constants.WEBSOCKET_UNAUTH + rsp.Message = "unAuth!!!" + b, _ := json.Marshal(rsp) + _ = conn.WriteMessage(websocket.TextMessage, b) return } //生成连接唯一标识 - uniqueId := uuid.New().String() + time.Now().Format("20060102150405") - c := wsConnectItem{ - conn: conn, + flag := uuid.New().String() + time.Now().Format("20060102150405") + ws := wsConnectItem{ + conn: conn, + flag: flag, + closeChan: make(chan struct{}, 1), + inChan: make(chan interface{}, 1000), + isClose: false, } //保存连接 - mapConn.Store(uniqueId, c) - //把uniqueId传回去 - rsp := types.DataTransferRsp{ - MsgType: constants.WEBSOCKET_CONNECT_SUCCESS, - Message: uniqueId, - } + mapConn.Store(flag, ws) + defer ws.close() + //把连接成功消息发回去 + rsp.MsgType = constants.WEBSOCKET_CONNECT_SUCCESS + rsp.Message = flag b, _ := json.Marshal(rsp) conn.WriteMessage(websocket.TextMessage, b) + //循环读客户端信息 + go ws.readLoop() + //推消息到云渲染 + go ws.sendLoop() + //心跳 + for { + time.Sleep(time.Second * 3) + select { + case <-ws.closeChan: + return + default: + //发送心跳信息 + rsp.MsgType = constants.WEBSOCKET_HEARTBEAT + rsp.Message = "heartbeat" + b, _ = json.Marshal(rsp) + if err = conn.WriteMessage(websocket.TextMessage, b); err != nil { + logx.Error("发送心跳信息异常,关闭连接:", flag, err) + ws.close() + return + } + } + } + } } -func (w *wsConnectItem) Readloop() { - +// 关闭连接 +func (w *wsConnectItem) close() { + logx.Info("websocket:", w.flag, " is closing...") + w.conn.Close() + mapConn.Delete(w.flag) + if !w.isClose { + w.isClose = true + close(w.closeChan) + close(w.inChan) + } + logx.Info("websocket:", w.flag, " is closed") +} + +// 接受客户端发来的消息 +func (w *wsConnectItem) readLoop() { + for { + select { + case <-w.closeChan: //如果关闭了 + return + default: + _, data, err := w.conn.ReadMessage() + if err != nil { + logx.Error("接受信息错误:", err) + //关闭连接 + w.close() + return + } + //消息传入缓冲通道 + w.inChan <- data + } + } +} + +// 把收到的消息发往不同的地方处理 +func (w *wsConnectItem) sendLoop() { + for { + select { + case <-w.closeChan: + return + case data := <-w.inChan: + fmt.Println(data) + } + } }