fix
This commit is contained in:
		
							parent
							
								
									e05fcd8248
								
							
						
					
					
						commit
						999dc36e76
					
				| @ -1,270 +1,15 @@ | |||||||
| package handler | package handler | ||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
| 	"encoding/json" |  | ||||||
| 	"fmt" |  | ||||||
| 	"fusenapi/constants" |  | ||||||
| 	"fusenapi/server/websocket/internal/logic" | 	"fusenapi/server/websocket/internal/logic" | ||||||
| 	"fusenapi/server/websocket/internal/svc" | 	"fusenapi/server/websocket/internal/svc" | ||||||
| 	"fusenapi/server/websocket/internal/types" |  | ||||||
| 	"fusenapi/utils/auth" |  | ||||||
| 	"github.com/google/uuid" |  | ||||||
| 	"github.com/gorilla/websocket" |  | ||||||
| 	"github.com/zeromicro/go-zero/core/logx" |  | ||||||
| 	"net/http" | 	"net/http" | ||||||
| 	"sync" |  | ||||||
| 	"time" |  | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| var ( |  | ||||||
| 	//升级 |  | ||||||
| 	upgrade = websocket.Upgrader{ |  | ||||||
| 		//允许跨域 |  | ||||||
| 		CheckOrigin: func(r *http.Request) bool { |  | ||||||
| 			return true |  | ||||||
| 		}, |  | ||||||
| 	} |  | ||||||
| 	//连接map池 |  | ||||||
| 	mapConnPool = sync.Map{} |  | ||||||
| ) |  | ||||||
| 
 |  | ||||||
| // 每个连接的连接属性 |  | ||||||
| type wsConnectItem struct { |  | ||||||
| 	conn        *websocket.Conn //websocket的连接 |  | ||||||
| 	closeChan   chan struct{}   //关闭chan |  | ||||||
| 	isClose     bool            //是否已经关闭 |  | ||||||
| 	flag        string |  | ||||||
| 	inChan      chan []byte //接受消息缓冲通道 |  | ||||||
| 	outChan     chan []byte //发送回客户端的消息 |  | ||||||
| 	mutex       sync.Mutex |  | ||||||
| 	renderImage map[string]struct{} //需要渲染的图片 |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func DataTransferHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { | func DataTransferHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { | ||||||
| 	return func(w http.ResponseWriter, r *http.Request) { | 	return func(w http.ResponseWriter, r *http.Request) { | ||||||
| 		// 创建一个业务逻辑层实例 | 		// 创建一个业务逻辑层实例 | ||||||
| 		var req types.DataTransferReq |  | ||||||
| 		l := logic.NewDataTransferLogic(r.Context(), svcCtx) | 		l := logic.NewDataTransferLogic(r.Context(), svcCtx) | ||||||
| 		l.DataTransfer(&req, nil) | 		l.DataTransfer(svcCtx, w, r) | ||||||
| 		//升级websocket |  | ||||||
| 		conn, err := upgrade.Upgrade(w, r, nil) |  | ||||||
| 		if err != nil { |  | ||||||
| 			logx.Error("http upgrade websocket err:", err) |  | ||||||
| 			return |  | ||||||
| 		} |  | ||||||
| 		defer conn.Close() |  | ||||||
| 		rsp := types.DataTransferRsp{} |  | ||||||
| 		isAuth := true |  | ||||||
| 		// 解析JWT token,并对空用户进行判断 |  | ||||||
| 		claims, err := svcCtx.ParseJwtToken(r) |  | ||||||
| 		// 如果解析JWT token出错,则返回未授权的JSON响应并记录错误消息 |  | ||||||
| 		if err != nil { |  | ||||||
| 			rsp.T = constants.WEBSOCKET_UNAUTH |  | ||||||
| 			rsp.D = "unAuth" |  | ||||||
| 			b, _ := json.Marshal(rsp) |  | ||||||
| 			_ = conn.WriteMessage(websocket.TextMessage, b) |  | ||||||
| 			isAuth = false |  | ||||||
| 		} |  | ||||||
| 		if claims != nil { |  | ||||||
| 			// 从token中获取对应的用户信息 |  | ||||||
| 			_, err = auth.GetUserInfoFormMapClaims(claims) |  | ||||||
| 			// 如果获取用户信息出错,则返回未授权的JSON响应并记录错误消息 |  | ||||||
| 			if err != nil { |  | ||||||
| 				rsp.T = constants.WEBSOCKET_UNAUTH |  | ||||||
| 				rsp.D = "unAuth!!" |  | ||||||
| 				b, _ := json.Marshal(rsp) |  | ||||||
| 				_ = conn.WriteMessage(websocket.TextMessage, b) |  | ||||||
| 				isAuth = false |  | ||||||
| 			} |  | ||||||
| 		} else { |  | ||||||
| 			// 如果claims为nil,则认为用户身份为白板用户 |  | ||||||
| 			rsp.T = constants.WEBSOCKET_UNAUTH |  | ||||||
| 			rsp.D = "unAuth!!!" |  | ||||||
| 			b, _ := json.Marshal(rsp) |  | ||||||
| 			_ = conn.WriteMessage(websocket.TextMessage, b) |  | ||||||
| 			isAuth = false |  | ||||||
| 		} |  | ||||||
| 		//不是授权的连接(10秒后关闭) |  | ||||||
| 		if !isAuth { |  | ||||||
| 			select { |  | ||||||
| 			case <-time.After(time.Second * 10): |  | ||||||
| 				conn.Close() |  | ||||||
| 				return |  | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
| 		//生成连接唯一标识 |  | ||||||
| 		flag := uuid.New().String() + time.Now().Format("20060102150405") |  | ||||||
| 		ws := wsConnectItem{ |  | ||||||
| 			conn:        conn, |  | ||||||
| 			flag:        flag, |  | ||||||
| 			closeChan:   make(chan struct{}, 1), |  | ||||||
| 			inChan:      make(chan []byte, 100), |  | ||||||
| 			outChan:     make(chan []byte, 100), |  | ||||||
| 			renderImage: make(map[string]struct{}), |  | ||||||
| 			isClose:     false, |  | ||||||
| 		} |  | ||||||
| 		//保存连接 |  | ||||||
| 		mapConnPool.Store(flag, ws) |  | ||||||
| 		defer ws.close() |  | ||||||
| 		//把连接成功消息发回去 |  | ||||||
| 		rsp.T = constants.WEBSOCKET_CONNECT_SUCCESS |  | ||||||
| 		rsp.D = flag |  | ||||||
| 		b, _ := json.Marshal(rsp) |  | ||||||
| 		conn.WriteMessage(websocket.TextMessage, b) |  | ||||||
| 		//循环读客户端信息 |  | ||||||
| 		go ws.readLoop() |  | ||||||
| 		//循环把数据发送给客户端 |  | ||||||
| 		go ws.writeLoop() |  | ||||||
| 		//推消息到云渲染 |  | ||||||
| 		go ws.sendLoop() |  | ||||||
| 		//心跳 |  | ||||||
| 		ws.heartbeat() |  | ||||||
| 
 |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| // 心跳 |  | ||||||
| func (w *wsConnectItem) heartbeat() { |  | ||||||
| 	rsp := types.DataTransferRsp{ |  | ||||||
| 		T: constants.WEBSOCKET_HEARTBEAT, |  | ||||||
| 		D: "", |  | ||||||
| 	} |  | ||||||
| 	b, _ := json.Marshal(rsp) |  | ||||||
| 	for { |  | ||||||
| 		time.Sleep(time.Second * 10) |  | ||||||
| 		select { |  | ||||||
| 		case <-w.closeChan: |  | ||||||
| 			return |  | ||||||
| 		default: |  | ||||||
| 
 |  | ||||||
| 		} |  | ||||||
| 		//发送心跳信息 |  | ||||||
| 		if err := w.conn.WriteMessage(websocket.TextMessage, b); err != nil { |  | ||||||
| 			logx.Error("发送心跳信息异常,关闭连接:", w.flag, err) |  | ||||||
| 			w.close() |  | ||||||
| 			return |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| // 关闭连接 |  | ||||||
| func (w *wsConnectItem) close() { |  | ||||||
| 	w.mutex.Lock() |  | ||||||
| 	defer w.mutex.Unlock() |  | ||||||
| 	logx.Info("websocket:", w.flag, " is closing...") |  | ||||||
| 	w.conn.Close() |  | ||||||
| 	mapConnPool.Delete(w.flag) |  | ||||||
| 	if !w.isClose { |  | ||||||
| 		w.isClose = true |  | ||||||
| 		close(w.closeChan) |  | ||||||
| 		close(w.outChan) |  | ||||||
| 		close(w.inChan) |  | ||||||
| 	} |  | ||||||
| 	logx.Info("websocket:", w.flag, " is closed") |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (w *wsConnectItem) writeLoop() { |  | ||||||
| 	for { |  | ||||||
| 		select { |  | ||||||
| 		case <-w.closeChan: //如果关闭了 |  | ||||||
| 			return |  | ||||||
| 		case data := <-w.outChan: |  | ||||||
| 			w.conn.WriteMessage(websocket.TextMessage, data) |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| // 接受客户端发来的消息 |  | ||||||
| func (w *wsConnectItem) readLoop() { |  | ||||||
| 	for { |  | ||||||
| 		select { |  | ||||||
| 		case <-w.closeChan: //如果关闭了 |  | ||||||
| 			return |  | ||||||
| 		default: |  | ||||||
| 
 |  | ||||||
| 		} |  | ||||||
| 		_, data, err := w.conn.ReadMessage() |  | ||||||
| 		if err != nil { |  | ||||||
| 			logx.Error("接受信息错误:", err) |  | ||||||
| 			//关闭连接 |  | ||||||
| 			w.close() |  | ||||||
| 			return |  | ||||||
| 		} |  | ||||||
| 		//消息传入缓冲通道 |  | ||||||
| 		w.inChan <- data |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| // 把收到的消息发往不同的地方处理 |  | ||||||
| func (w *wsConnectItem) sendLoop() { |  | ||||||
| 	for { |  | ||||||
| 		select { |  | ||||||
| 		case <-w.closeChan: |  | ||||||
| 			return |  | ||||||
| 		case data := <-w.inChan: |  | ||||||
| 			w.dealwithReciveData(data) |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| // 处理接受到的数据 |  | ||||||
| func (w *wsConnectItem) dealwithReciveData(data []byte) { |  | ||||||
| 	var parseInfo types.DataTransferReq |  | ||||||
| 	if err := json.Unmarshal(data, &parseInfo); err != nil { |  | ||||||
| 		logx.Error("invalid format of websocket message") |  | ||||||
| 		return |  | ||||||
| 	} |  | ||||||
| 	switch parseInfo.T { |  | ||||||
| 	//图片渲染 |  | ||||||
| 	case constants.WEBSOCKET_RENDER_IMAGE: |  | ||||||
| 		var renderImageData []types.RenderImageReqMsg |  | ||||||
| 		if err := json.Unmarshal([]byte(parseInfo.D), &renderImageData); err != nil { |  | ||||||
| 			logx.Error("invalid format of websocket render image message", err) |  | ||||||
| 			return |  | ||||||
| 		} |  | ||||||
| 		logx.Info("收到请求云渲染图片数据:", renderImageData) |  | ||||||
| 		w.mutex.Lock() |  | ||||||
| 		defer w.mutex.Unlock() |  | ||||||
| 		//把需要渲染的图片加进去 |  | ||||||
| 		for _, v := range renderImageData { |  | ||||||
| 			key := w.getRenderImageMapKey(v.ProductId, v.SizeId, v.TemplateId) |  | ||||||
| 			w.renderImage[key] = struct{}{} |  | ||||||
| 		} |  | ||||||
| 	default: |  | ||||||
| 
 |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| // 把渲染好的数据放入outchan |  | ||||||
| func (w *wsConnectItem) setOutRenderImage(req types.RenderNotifyReq) { |  | ||||||
| 	w.mutex.Lock() |  | ||||||
| 	defer w.mutex.Unlock() |  | ||||||
| 	for _, notifyItem := range req.NotifyList { |  | ||||||
| 		renderKey := w.getRenderImageMapKey(notifyItem.ProductId, notifyItem.SizeId, notifyItem.TemplateId) |  | ||||||
| 		//查询 |  | ||||||
| 		_, ok := w.renderImage[renderKey] |  | ||||||
| 		if !ok { |  | ||||||
| 			continue |  | ||||||
| 		} |  | ||||||
| 		responseData := types.RenderImageRspMsg{ |  | ||||||
| 			ProductId:  notifyItem.ProductId, |  | ||||||
| 			SizeId:     notifyItem.SizeId, |  | ||||||
| 			TemplateId: notifyItem.TemplateId, |  | ||||||
| 			Source:     "我是渲染资源", |  | ||||||
| 		} |  | ||||||
| 		b, _ := json.Marshal(responseData) |  | ||||||
| 		select { |  | ||||||
| 		case <-w.closeChan: |  | ||||||
| 			return |  | ||||||
| 		case w.outChan <- b: |  | ||||||
| 			logx.Info("notify send render result to out chan") |  | ||||||
| 		} |  | ||||||
| 		//删掉已经处理的渲染任务 |  | ||||||
| 		delete(w.renderImage, renderKey) |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| // 获取需要渲染图片的map key |  | ||||||
| func (w *wsConnectItem) getRenderImageMapKey(productId, sizeId, templateId int64) string { |  | ||||||
| 	return fmt.Sprintf("%d-%d-%d", productId, sizeId, templateId) |  | ||||||
| } |  | ||||||
|  | |||||||
| @ -1,18 +1,12 @@ | |||||||
| package handler | package handler | ||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
| 	"crypto/sha256" | 	"fusenapi/server/websocket/internal/logic" | ||||||
| 	"encoding/hex" |  | ||||||
| 	"encoding/json" |  | ||||||
| 	"fmt" |  | ||||||
| 	"fusenapi/constants" |  | ||||||
| 	"fusenapi/utils/basic" |  | ||||||
| 	"github.com/zeromicro/go-zero/rest/httpx" |  | ||||||
| 	"net/http" |  | ||||||
| 	"time" |  | ||||||
| 
 |  | ||||||
| 	"fusenapi/server/websocket/internal/svc" | 	"fusenapi/server/websocket/internal/svc" | ||||||
| 	"fusenapi/server/websocket/internal/types" | 	"fusenapi/server/websocket/internal/types" | ||||||
|  | 	"fusenapi/utils/basic" | ||||||
|  | 	"net/http" | ||||||
|  | 	"reflect" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| func RenderNotifyHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { | func RenderNotifyHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { | ||||||
| @ -20,56 +14,18 @@ func RenderNotifyHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { | |||||||
| 		var req types.RenderNotifyReq | 		var req types.RenderNotifyReq | ||||||
| 		_, err := basic.RequestParse(w, r, svcCtx, &req) | 		_, err := basic.RequestParse(w, r, svcCtx, &req) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			httpx.OkJsonCtx(r.Context(), w, basic.Response{ |  | ||||||
| 				Code:    basic.CodeRequestParamsErr.Code, |  | ||||||
| 				Message: "err param", |  | ||||||
| 				Data:    nil, |  | ||||||
| 			}) |  | ||||||
| 			return | 			return | ||||||
| 		} | 		} | ||||||
| 		if len(req.NotifyList) == 0 { | 		// 创建一个业务逻辑层实例 | ||||||
| 			httpx.OkJsonCtx(r.Context(), w, basic.Response{ | 		l := logic.NewRenderNotifyLogic(r.Context(), svcCtx) | ||||||
| 				Code:    basic.CodeRequestParamsErr.Code, | 
 | ||||||
| 				Message: "invalid param,notify list is empty", | 		rl := reflect.ValueOf(l) | ||||||
| 				Data:    nil, | 		basic.BeforeLogic(w, r, rl) | ||||||
| 			}) | 
 | ||||||
| 			return | 		resp := l.RenderNotify(&req) | ||||||
| 		} | 
 | ||||||
| 		if time.Now().Unix()-120 > req.Time /*|| req.Time > time.Now().Unix() */ { | 		if !basic.AfterLogic(w, r, rl, resp) { | ||||||
| 			httpx.OkJsonCtx(r.Context(), w, basic.Response{ | 			basic.NormalAfterLogic(w, r, resp) | ||||||
| 				Code:    basic.CodeRequestParamsErr.Code, | 		} | ||||||
| 				Message: "invalid param,time is expired", |  | ||||||
| 				Data:    nil, |  | ||||||
| 			}) |  | ||||||
| 			return |  | ||||||
| 		} |  | ||||||
| 		//验证签名 sha256 |  | ||||||
| 		notifyByte, _ := json.Marshal(req.NotifyList) |  | ||||||
| 		h := sha256.New() |  | ||||||
| 		h.Write([]byte(fmt.Sprintf(constants.RENDER_NOTIFY_SIGN_KEY, string(notifyByte), req.Time))) |  | ||||||
| 		signHex := h.Sum(nil) |  | ||||||
| 		sign := hex.EncodeToString(signHex) |  | ||||||
| 		if req.Sign != sign { |  | ||||||
| 			httpx.OkJsonCtx(r.Context(), w, basic.Response{ |  | ||||||
| 				Code:    basic.CodeRequestParamsErr.Code, |  | ||||||
| 				Message: "invalid sign", |  | ||||||
| 				Data:    nil, |  | ||||||
| 			}) |  | ||||||
| 			return |  | ||||||
| 		} |  | ||||||
| 		//遍历链接 |  | ||||||
| 		mapConnPool.Range(func(key, value any) bool { |  | ||||||
| 			ws, ok := value.(wsConnectItem) |  | ||||||
| 			if !ok { |  | ||||||
| 				return false |  | ||||||
| 			} |  | ||||||
| 			ws.setOutRenderImage(req) |  | ||||||
| 			return true |  | ||||||
| 		}) |  | ||||||
| 		httpx.OkJsonCtx(r.Context(), w, basic.Response{ |  | ||||||
| 			Code:    200, |  | ||||||
| 			Message: "success", |  | ||||||
| 			Data:    nil, |  | ||||||
| 		}) |  | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  | |||||||
| @ -1,14 +1,20 @@ | |||||||
| package logic | package logic | ||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
| 	"fusenapi/utils/auth" | 	"encoding/json" | ||||||
|  | 	"fmt" | ||||||
|  | 	"fusenapi/constants" | ||||||
|  | 	"fusenapi/server/websocket/internal/types" | ||||||
| 	"fusenapi/utils/basic" | 	"fusenapi/utils/basic" | ||||||
|  | 	"github.com/google/uuid" | ||||||
|  | 	"github.com/gorilla/websocket" | ||||||
|  | 	"net/http" | ||||||
|  | 	"sync" | ||||||
|  | 	"time" | ||||||
| 
 | 
 | ||||||
| 	"context" | 	"context" | ||||||
| 
 | 
 | ||||||
| 	"fusenapi/server/websocket/internal/svc" | 	"fusenapi/server/websocket/internal/svc" | ||||||
| 	"fusenapi/server/websocket/internal/types" |  | ||||||
| 
 |  | ||||||
| 	"github.com/zeromicro/go-zero/core/logx" | 	"github.com/zeromicro/go-zero/core/logx" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| @ -26,18 +32,250 @@ func NewDataTransferLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Data | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // 处理进入前逻辑w,r | var ( | ||||||
| // func (l *DataTransferLogic) BeforeLogic(w http.ResponseWriter, r *http.Request) { | 	//升级 | ||||||
| // } | 	upgrade = websocket.Upgrader{ | ||||||
|  | 		//允许跨域 | ||||||
|  | 		CheckOrigin: func(r *http.Request) bool { | ||||||
|  | 			return true | ||||||
|  | 		}, | ||||||
|  | 	} | ||||||
|  | 	//连接map池 | ||||||
|  | 	mapConnPool = sync.Map{} | ||||||
|  | ) | ||||||
| 
 | 
 | ||||||
| // 处理逻辑后 w,r 如:重定向, resp 必须重新处理 | // 每个连接的连接属性 | ||||||
| // func (l *DataTransferLogic) AfterLogic(w http.ResponseWriter, r *http.Request, resp *basic.Response) { | type wsConnectItem struct { | ||||||
| // // httpx.OkJsonCtx(r.Context(), w, resp) | 	conn        *websocket.Conn //websocket的连接 | ||||||
| // } | 	closeChan   chan struct{}   //关闭chan | ||||||
|  | 	isClose     bool            //是否已经关闭 | ||||||
|  | 	flag        string | ||||||
|  | 	inChan      chan []byte //接受消息缓冲通道 | ||||||
|  | 	outChan     chan []byte //发送回客户端的消息 | ||||||
|  | 	mutex       sync.Mutex | ||||||
|  | 	renderImage map[string]struct{} //需要渲染的图片 | ||||||
|  | } | ||||||
| 
 | 
 | ||||||
| func (l *DataTransferLogic) DataTransfer(req *types.DataTransferReq, userinfo *auth.UserInfo) (resp *basic.Response) { | func (l *DataTransferLogic) DataTransfer(svcCtx *svc.ServiceContext, w http.ResponseWriter, r *http.Request) (resp *basic.Response) { | ||||||
| 	// 返回值必须调用Set重新返回, resp可以空指针调用 resp.SetStatus(basic.CodeOK, data) | 	//升级websocket | ||||||
| 	// userinfo 传入值时, 一定不为null | 	conn, err := upgrade.Upgrade(w, r, nil) | ||||||
|  | 	if err != nil { | ||||||
|  | 		logx.Error("http upgrade websocket err:", err) | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  | 	defer conn.Close() | ||||||
|  | 	rsp := types.DataTransferData{} | ||||||
|  | 	/*isAuth := true | ||||||
|  | 	// 解析JWT token,并对空用户进行判断 | ||||||
|  | 	claims, err := svcCtx.ParseJwtToken(r) | ||||||
|  | 	// 如果解析JWT token出错,则返回未授权的JSON响应并记录错误消息 | ||||||
|  | 	if err != nil { | ||||||
|  | 		rsp.T = constants.WEBSOCKET_UNAUTH | ||||||
|  | 		rsp.D = "unAuth" | ||||||
|  | 		b, _ := json.Marshal(rsp) | ||||||
|  | 		_ = conn.WriteMessage(websocket.TextMessage, b) | ||||||
|  | 		isAuth = false | ||||||
|  | 	} | ||||||
|  | 	if claims != nil { | ||||||
|  | 		// 从token中获取对应的用户信息 | ||||||
|  | 		_, err = auth.GetUserInfoFormMapClaims(claims) | ||||||
|  | 		// 如果获取用户信息出错,则返回未授权的JSON响应并记录错误消息 | ||||||
|  | 		if err != nil { | ||||||
|  | 			rsp.T = constants.WEBSOCKET_UNAUTH | ||||||
|  | 			rsp.D = "unAuth!!" | ||||||
|  | 			b, _ := json.Marshal(rsp) | ||||||
|  | 			_ = conn.WriteMessage(websocket.TextMessage, b) | ||||||
|  | 			isAuth = false | ||||||
|  | 		} | ||||||
|  | 	} else { | ||||||
|  | 		// 如果claims为nil,则认为用户身份为白板用户 | ||||||
|  | 		rsp.T = constants.WEBSOCKET_UNAUTH | ||||||
|  | 		rsp.D = "unAuth!!!" | ||||||
|  | 		b, _ := json.Marshal(rsp) | ||||||
|  | 		_ = conn.WriteMessage(websocket.TextMessage, b) | ||||||
|  | 		isAuth = false | ||||||
|  | 	} | ||||||
|  | 	//不是授权的连接(10秒后关闭) | ||||||
|  | 	if !isAuth { | ||||||
|  | 		select { | ||||||
|  | 		case <-time.After(time.Second * 10): | ||||||
|  | 			conn.Close() | ||||||
|  | 			return | ||||||
|  | 		} | ||||||
|  | 	}*/ | ||||||
|  | 	//生成连接唯一标识 | ||||||
|  | 	flag := uuid.New().String() + time.Now().Format("20060102150405") | ||||||
|  | 	ws := wsConnectItem{ | ||||||
|  | 		conn:        conn, | ||||||
|  | 		flag:        flag, | ||||||
|  | 		closeChan:   make(chan struct{}, 1), | ||||||
|  | 		inChan:      make(chan []byte, 100), | ||||||
|  | 		outChan:     make(chan []byte, 100), | ||||||
|  | 		renderImage: make(map[string]struct{}), | ||||||
|  | 		isClose:     false, | ||||||
|  | 	} | ||||||
|  | 	//保存连接 | ||||||
|  | 	mapConnPool.Store(flag, ws) | ||||||
|  | 	defer ws.close() | ||||||
|  | 	//把连接成功消息发回去 | ||||||
|  | 	rsp.T = constants.WEBSOCKET_CONNECT_SUCCESS | ||||||
|  | 	rsp.D = flag | ||||||
|  | 	b, _ := json.Marshal(rsp) | ||||||
|  | 	conn.WriteMessage(websocket.TextMessage, b) | ||||||
|  | 	//循环读客户端信息 | ||||||
|  | 	go ws.readLoop() | ||||||
|  | 	//循环把数据发送给客户端 | ||||||
|  | 	go ws.writeLoop() | ||||||
|  | 	//推消息到云渲染 | ||||||
|  | 	go ws.sendLoop() | ||||||
|  | 	//心跳 | ||||||
|  | 	ws.heartbeat() | ||||||
| 
 | 
 | ||||||
| 	return resp.SetStatus(basic.CodeOK) | 	return resp.SetStatus(basic.CodeOK) | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | // 心跳 | ||||||
|  | func (w *wsConnectItem) heartbeat() { | ||||||
|  | 	rsp := types.DataTransferData{ | ||||||
|  | 		T: constants.WEBSOCKET_HEARTBEAT, | ||||||
|  | 		D: "", | ||||||
|  | 	} | ||||||
|  | 	b, _ := json.Marshal(rsp) | ||||||
|  | 	for { | ||||||
|  | 		time.Sleep(time.Second * 10) | ||||||
|  | 		select { | ||||||
|  | 		case <-w.closeChan: | ||||||
|  | 			return | ||||||
|  | 		default: | ||||||
|  | 
 | ||||||
|  | 		} | ||||||
|  | 		//发送心跳信息 | ||||||
|  | 		if err := w.conn.WriteMessage(websocket.TextMessage, b); err != nil { | ||||||
|  | 			logx.Error("发送心跳信息异常,关闭连接:", w.flag, err) | ||||||
|  | 			w.close() | ||||||
|  | 			return | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // 关闭连接 | ||||||
|  | func (w *wsConnectItem) close() { | ||||||
|  | 	w.mutex.Lock() | ||||||
|  | 	defer w.mutex.Unlock() | ||||||
|  | 	logx.Info("websocket:", w.flag, " is closing...") | ||||||
|  | 	w.conn.Close() | ||||||
|  | 	mapConnPool.Delete(w.flag) | ||||||
|  | 	if !w.isClose { | ||||||
|  | 		w.isClose = true | ||||||
|  | 		close(w.closeChan) | ||||||
|  | 		close(w.outChan) | ||||||
|  | 		close(w.inChan) | ||||||
|  | 	} | ||||||
|  | 	logx.Info("websocket:", w.flag, " is closed") | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (w *wsConnectItem) writeLoop() { | ||||||
|  | 	for { | ||||||
|  | 		select { | ||||||
|  | 		case <-w.closeChan: //如果关闭了 | ||||||
|  | 			return | ||||||
|  | 		case data := <-w.outChan: | ||||||
|  | 			w.conn.WriteMessage(websocket.TextMessage, data) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // 接受客户端发来的消息 | ||||||
|  | func (w *wsConnectItem) readLoop() { | ||||||
|  | 	for { | ||||||
|  | 		select { | ||||||
|  | 		case <-w.closeChan: //如果关闭了 | ||||||
|  | 			return | ||||||
|  | 		default: | ||||||
|  | 
 | ||||||
|  | 		} | ||||||
|  | 		_, data, err := w.conn.ReadMessage() | ||||||
|  | 		if err != nil { | ||||||
|  | 			logx.Error("接受信息错误:", err) | ||||||
|  | 			//关闭连接 | ||||||
|  | 			w.close() | ||||||
|  | 			return | ||||||
|  | 		} | ||||||
|  | 		//消息传入缓冲通道 | ||||||
|  | 		w.inChan <- data | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // 把收到的消息发往不同的地方处理 | ||||||
|  | func (w *wsConnectItem) sendLoop() { | ||||||
|  | 	for { | ||||||
|  | 		select { | ||||||
|  | 		case <-w.closeChan: | ||||||
|  | 			return | ||||||
|  | 		case data := <-w.inChan: | ||||||
|  | 			w.dealwithReciveData(data) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // 处理接受到的数据 | ||||||
|  | func (w *wsConnectItem) dealwithReciveData(data []byte) { | ||||||
|  | 	var parseInfo types.DataTransferData | ||||||
|  | 	if err := json.Unmarshal(data, &parseInfo); err != nil { | ||||||
|  | 		logx.Error("invalid format of websocket message") | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  | 	switch parseInfo.T { | ||||||
|  | 	//图片渲染 | ||||||
|  | 	case constants.WEBSOCKET_RENDER_IMAGE: | ||||||
|  | 		var renderImageData []types.RenderImageReqMsg | ||||||
|  | 		if err := json.Unmarshal([]byte(parseInfo.D), &renderImageData); err != nil { | ||||||
|  | 			logx.Error("invalid format of websocket render image message", err) | ||||||
|  | 			return | ||||||
|  | 		} | ||||||
|  | 		logx.Info("收到请求云渲染图片数据:", renderImageData) | ||||||
|  | 		w.mutex.Lock() | ||||||
|  | 		defer w.mutex.Unlock() | ||||||
|  | 		//把需要渲染的图片加进去 | ||||||
|  | 		for _, v := range renderImageData { | ||||||
|  | 			key := w.getRenderImageMapKey(v.ProductId, v.SizeId, v.TemplateId) | ||||||
|  | 			w.renderImage[key] = struct{}{} | ||||||
|  | 		} | ||||||
|  | 	default: | ||||||
|  | 
 | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // 把渲染好的数据放入outchan | ||||||
|  | func (w *wsConnectItem) setOutRenderImage(req *types.RenderNotifyReq) { | ||||||
|  | 	w.mutex.Lock() | ||||||
|  | 	defer w.mutex.Unlock() | ||||||
|  | 	for _, notifyItem := range req.NotifyList { | ||||||
|  | 		renderKey := w.getRenderImageMapKey(notifyItem.ProductId, notifyItem.SizeId, notifyItem.TemplateId) | ||||||
|  | 		//查询 | ||||||
|  | 		_, ok := w.renderImage[renderKey] | ||||||
|  | 		if !ok { | ||||||
|  | 			continue | ||||||
|  | 		} | ||||||
|  | 		responseData := types.RenderImageRspMsg{ | ||||||
|  | 			ProductId:  notifyItem.ProductId, | ||||||
|  | 			SizeId:     notifyItem.SizeId, | ||||||
|  | 			TemplateId: notifyItem.TemplateId, | ||||||
|  | 			Source:     "我是渲染资源", | ||||||
|  | 		} | ||||||
|  | 		b, _ := json.Marshal(responseData) | ||||||
|  | 		select { | ||||||
|  | 		case <-w.closeChan: | ||||||
|  | 			return | ||||||
|  | 		case w.outChan <- b: | ||||||
|  | 			logx.Info("notify send render result to out chan") | ||||||
|  | 		} | ||||||
|  | 		//删掉已经处理的渲染任务 | ||||||
|  | 		delete(w.renderImage, renderKey) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // 获取需要渲染图片的map key | ||||||
|  | func (w *wsConnectItem) getRenderImageMapKey(productId, sizeId, templateId int64) string { | ||||||
|  | 	return fmt.Sprintf("%d-%d-%d", productId, sizeId, templateId) | ||||||
|  | } | ||||||
|  | |||||||
| @ -1,8 +1,13 @@ | |||||||
| package logic | package logic | ||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
| 	"fusenapi/utils/auth" | 	"crypto/sha256" | ||||||
|  | 	"encoding/hex" | ||||||
|  | 	"encoding/json" | ||||||
|  | 	"fmt" | ||||||
|  | 	"fusenapi/constants" | ||||||
| 	"fusenapi/utils/basic" | 	"fusenapi/utils/basic" | ||||||
|  | 	"time" | ||||||
| 
 | 
 | ||||||
| 	"context" | 	"context" | ||||||
| 
 | 
 | ||||||
| @ -35,9 +40,30 @@ func NewRenderNotifyLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Rend | |||||||
| // // httpx.OkJsonCtx(r.Context(), w, resp) | // // httpx.OkJsonCtx(r.Context(), w, resp) | ||||||
| // } | // } | ||||||
| 
 | 
 | ||||||
| func (l *RenderNotifyLogic) RenderNotify(req *types.RenderNotifyReq, userinfo *auth.UserInfo) (resp *basic.Response) { | func (l *RenderNotifyLogic) RenderNotify(req *types.RenderNotifyReq) (resp *basic.Response) { | ||||||
| 	// 返回值必须调用Set重新返回, resp可以空指针调用 resp.SetStatus(basic.CodeOK, data) | 	if len(req.NotifyList) == 0 { | ||||||
| 	// userinfo 传入值时, 一定不为null | 		return resp.SetStatusWithMessage(basic.CodeRequestParamsErr, "notify list is empty") | ||||||
| 
 | 	} | ||||||
|  | 	if time.Now().Unix()-120 > req.Time /*|| req.Time > time.Now().Unix() */ { | ||||||
|  | 		return resp.SetStatusWithMessage(basic.CodeRequestParamsErr, "time is expire") | ||||||
|  | 	} | ||||||
|  | 	//验证签名 sha256 | ||||||
|  | 	notifyByte, _ := json.Marshal(req.NotifyList) | ||||||
|  | 	h := sha256.New() | ||||||
|  | 	h.Write([]byte(fmt.Sprintf(constants.RENDER_NOTIFY_SIGN_KEY, string(notifyByte), req.Time))) | ||||||
|  | 	signHex := h.Sum(nil) | ||||||
|  | 	sign := hex.EncodeToString(signHex) | ||||||
|  | 	if req.Sign != sign { | ||||||
|  | 		return resp.SetStatusWithMessage(basic.CodeRequestParamsErr, "invalid") | ||||||
|  | 	} | ||||||
|  | 	//遍历websocket链接 | ||||||
|  | 	mapConnPool.Range(func(key, value any) bool { | ||||||
|  | 		ws, ok := value.(wsConnectItem) | ||||||
|  | 		if !ok { | ||||||
|  | 			return false | ||||||
|  | 		} | ||||||
|  | 		ws.setOutRenderImage(req) | ||||||
|  | 		return true | ||||||
|  | 	}) | ||||||
| 	return resp.SetStatus(basic.CodeOK) | 	return resp.SetStatus(basic.CodeOK) | ||||||
| } | } | ||||||
|  | |||||||
| @ -5,12 +5,7 @@ import ( | |||||||
| 	"fusenapi/utils/basic" | 	"fusenapi/utils/basic" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| type DataTransferReq struct { | type DataTransferData struct { | ||||||
| 	T string `json:"t"` //消息类型 |  | ||||||
| 	D string `json:"d"` //传递的消息 |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| type DataTransferRsp struct { |  | ||||||
| 	T string `json:"t"` //消息类型 | 	T string `json:"t"` //消息类型 | ||||||
| 	D string `json:"d"` //传递的消息 | 	D string `json:"d"` //传递的消息 | ||||||
| } | } | ||||||
|  | |||||||
| @ -11,18 +11,14 @@ import "basic.api" | |||||||
| service websocket { | service websocket { | ||||||
| 	//websocket数据交互 | 	//websocket数据交互 | ||||||
| 	@handler DataTransferHandler | 	@handler DataTransferHandler | ||||||
| 	get /api/websocket/data_transfer(DataTransferReq) returns (response); | 	get /api/websocket/data_transfer(request) returns (response); | ||||||
| 	//渲染完了通知接口 | 	//渲染完了通知接口 | ||||||
| 	@handler RenderNotifyHandler | 	@handler RenderNotifyHandler | ||||||
| 	post /api/websocket/render_notify(RenderNotifyReq) returns (response); | 	post /api/websocket/render_notify(RenderNotifyReq) returns (response); | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| //websocket数据交互 | //websocket数据交互 | ||||||
| type DataTransferReq { | type DataTransferData { | ||||||
| 	T string `json:"t"` //消息类型 |  | ||||||
| 	D string `json:"d"` //传递的消息 |  | ||||||
| } |  | ||||||
| type DataTransferRsp { |  | ||||||
| 	T string `json:"t"` //消息类型 | 	T string `json:"t"` //消息类型 | ||||||
| 	D string `json:"d"` //传递的消息 | 	D string `json:"d"` //传递的消息 | ||||||
| } | } | ||||||
|  | |||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user