diff --git a/go.mod b/go.mod index 2c4e7279..62244004 100644 --- a/go.mod +++ b/go.mod @@ -68,11 +68,10 @@ require ( google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20230530153820-e85fd2cbaebc // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20230530153820-e85fd2cbaebc // indirect - gorm.io/datatypes v1.2.0 // indirect ) require ( - github.com/474420502/requests v1.40.0 + github.com/474420502/requests v1.42.0 github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.5 // indirect github.com/tidwall/gjson v1.12.0 @@ -114,7 +113,7 @@ require ( go.opentelemetry.io/otel/trace v1.14.0 // indirect go.opentelemetry.io/proto/otlp v0.19.0 // indirect go.uber.org/automaxprocs v1.5.2 // indirect - golang.org/x/net v0.12.0 + golang.org/x/net v0.12.0 // indirect golang.org/x/sys v0.10.0 // indirect golang.org/x/text v0.11.0 google.golang.org/genproto v0.0.0-20230530153820-e85fd2cbaebc // indirect diff --git a/go.sum b/go.sum index 6471a8ae..815b0b69 100644 --- a/go.sum +++ b/go.sum @@ -38,8 +38,8 @@ dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7 github.com/474420502/passer v0.0.1 h1:ZWnt7hpFzsYDV7LHSEyLvLUvW5mRxrnDmgFdIl17q3w= github.com/474420502/passer v0.0.1/go.mod h1:MmnnrF9d51sPkFzdRq2pQtxQKqyjburVM1LjMbOCezE= github.com/474420502/random v0.4.1 h1:HUUyLXRWMijVb7CJoEC16f0aFQOW25Lkr80Mut6PoKU= -github.com/474420502/requests v1.40.0 h1:VDuLxSG/3IGBvMfjPV8+o7s1l5mOwLAgfo5Og6vMAJw= -github.com/474420502/requests v1.40.0/go.mod h1:2SCVzim0ONFYG09g/GrM7RTeJIC6qTyZfnohsjnG5C8= +github.com/474420502/requests v1.42.0 h1:aUj0rWhfldbOOlGHDIcqT9zgXEoSlK4IBmRF3LxI1+Y= +github.com/474420502/requests v1.42.0/go.mod h1:2SCVzim0ONFYG09g/GrM7RTeJIC6qTyZfnohsjnG5C8= github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= @@ -979,8 +979,6 @@ gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gorm.io/datatypes v1.2.0 h1:5YT+eokWdIxhJgWHdrb2zYUimyk0+TaFth+7a0ybzco= -gorm.io/datatypes v1.2.0/go.mod h1:o1dh0ZvjIjhH/bngTpypG6lVRJ5chTBxE09FH/71k04= gorm.io/driver/mysql v1.5.1 h1:WUEH5VF9obL/lTtzjmML/5e6VfFR/788coz2uaVCAZw= gorm.io/driver/mysql v1.5.1/go.mod h1:Jo3Xu7mMhCyj8dlrb3WoCaRd1FhsVh+yMXb1jUInf5o= gorm.io/gorm v1.25.1 h1:nsSALe5Pr+cM3V1qwwQ7rOkw+6UeLrX5O4v3llhHa64= diff --git a/model/gmodel/fs_guest_logic.go b/model/gmodel/fs_guest_logic.go index 938a5cd8..6e49a74c 100755 --- a/model/gmodel/fs_guest_logic.go +++ b/model/gmodel/fs_guest_logic.go @@ -9,14 +9,14 @@ import ( "gorm.io/gorm" ) -func (m *FsGuestModel) GenerateGuestID(ctx context.Context, AccessSecret *string) (authKey string, err error) { +func (m *FsGuestModel) GenerateGuestID(ctx context.Context, AccessSecret uint64) (authKey string, err error) { err = m.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { now := time.Now().Unix() var record = &FsGuest{} tx.Create(record) - authKey, err = auth.GenerateJwtToken(AccessSecret, now, 31536000, 0, int64(record.GuestId)) + authKey, err = auth.GenerateJwtTokenUint64(AccessSecret, now, 31536000, 0, int64(record.GuestId)) if err != nil { logx.Error(err) err = tx.Rollback().Error diff --git a/model/gmodel/fs_user_logic.go b/model/gmodel/fs_user_logic.go index fee0fbe6..7c1da1a4 100644 --- a/model/gmodel/fs_user_logic.go +++ b/model/gmodel/fs_user_logic.go @@ -2,6 +2,7 @@ package gmodel import ( "context" + "fmt" "fusenapi/utils/auth" "time" @@ -45,6 +46,37 @@ func (u *FsUserModel) Transaction(ctx context.Context, fc func(tx *gorm.DB) erro return u.db.WithContext(ctx).Transaction(fc) } +// 继承guest_id的资源表 +func InheritGuestIdResource(tx *gorm.DB, userId, guestId int64) error { + var err error + if guestId != 0 { + // 继承guest_id的资源表 + err = tx.Model(&FsResource{}). + Where("guest_id = ?", guestId). + UpdateColumn("user_id", userId).Error + + if err != nil { + return err + } + + err = tx.Model(&FsUserMaterial{}). + Where("guest_id = ?", guestId). + UpdateColumn("user_id", userId).Error + + if err != nil { + return err + } + + err = tx.Model(&FsUserInfo{}). + Where("guest_id = ?", guestId). + UpdateColumn("user_id", userId).Error + if err != nil { + return err + } + } + return fmt.Errorf("guest_id must not be 0") +} + // 谷歌平台的注册流程 func (u *FsUserModel) RegisterByGoogleOAuth(ctx context.Context, token *auth.RegisterToken) (*FsUser, error) { user := &FsUser{} @@ -67,11 +99,8 @@ func (u *FsUserModel) RegisterByGoogleOAuth(ctx context.Context, token *auth.Reg if token.GuestId != 0 { // 继承guest_id的资源表 - return tx.Model(&FsResource{}). - Where("guest_id = ?", token.GuestId). - UpdateColumn("user_id", user.Id).Error + return InheritGuestIdResource(tx, user.Id, token.GuestId) } - return err } @@ -90,6 +119,46 @@ func (u *FsUserModel) RegisterByGoogleOAuth(ctx context.Context, token *auth.Reg return user, nil } +// 自平台的注册流程 +func (u *FsUserModel) RegisterByFusen(ctx context.Context, token *auth.RegisterToken) (*FsUser, error) { + user := &FsUser{} + + err := u.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { + + err := tx.Model(user).Where("email = ?", token.Email).Take(user).Error + if err != nil { + // 没有找到在数据库就创建注册 + if err == gorm.ErrRecordNotFound { + createAt := time.Now().Unix() + user.Email = &token.Email + user.CreatedAt = &createAt + user.PasswordHash = &token.Password + err = tx.Model(user).Create(user).Error + if err != nil { + return err + } + if token.GuestId != 0 { + // 继承guest_id的资源表 + return InheritGuestIdResource(tx, user.Id, token.GuestId) + } + return err + } + return err + } + + if err == nil { + return fmt.Errorf("the email had registered") + } + return err + }) + + if err != nil { + return nil, err + } + + return user, nil +} + func (u *FsUserModel) UpdateUserBasicInfoById(ctx context.Context, Id int64, user *UserBasicInfoForSave) (err error) { err = u.db.WithContext(ctx).Model(&FsUser{}). diff --git a/server/auth/internal/logic/acceptcookielogic.go b/server/auth/internal/logic/acceptcookielogic.go index aa57d085..963fdfd9 100644 --- a/server/auth/internal/logic/acceptcookielogic.go +++ b/server/auth/internal/logic/acceptcookielogic.go @@ -44,7 +44,7 @@ func (l *AcceptCookieLogic) AcceptCookie(req *types.Request, userinfo *auth.User } m := l.svcCtx.AllModels.FsGuest - token, err := m.GenerateGuestID(l.ctx, &l.svcCtx.Config.Auth.AccessSecret) + token, err := m.GenerateGuestID(l.ctx, auth.DefaultJwtSecret) if err != nil { return resp.SetStatus(basic.CodeGuestGenErr) } diff --git a/server/auth/internal/logic/email_manager.go b/server/auth/internal/logic/email_manager.go index 928d1230..05a74cdc 100644 --- a/server/auth/internal/logic/email_manager.go +++ b/server/auth/internal/logic/email_manager.go @@ -54,7 +54,7 @@ func (m *EmailSender) ProcessEmailTasks() { m.emailSending[emailformat.TargetEmail] = &EmailTask{ Email: emailformat, - SendTime: time.Now(), + SendTime: time.Now().UTC(), } m.lock.Unlock() @@ -82,7 +82,7 @@ func (m *EmailSender) Resend(emailTarget string, content []byte) { defer m.lock.Unlock() // Check if the email task still exists and has not been sent successfully - if task, ok := m.emailSending[emailTarget]; ok && task.SendTime.Add(m.ResendTimeLimit).After(time.Now()) { + if task, ok := m.emailSending[emailTarget]; ok && task.SendTime.Add(m.ResendTimeLimit).After(time.Now().UTC()) { err := smtp.SendMail(emailTarget, m.Auth, m.FromEmail, []string{emailTarget}, content) if err != nil { log.Printf("Failed to resend email to %s: %v\n", emailTarget, err) @@ -102,7 +102,7 @@ func (m *EmailSender) ClearExpiredTasks() { m.lock.Lock() for email, task := range m.emailSending { - if task.SendTime.Add(m.ResendTimeLimit).Before(time.Now()) { + if task.SendTime.Add(m.ResendTimeLimit).Before(time.Now().UTC()) { delete(m.emailSending, email) } } diff --git a/server/auth/internal/logic/useremailconfirmationlogic.go b/server/auth/internal/logic/useremailconfirmationlogic.go index 28103179..163ca27b 100644 --- a/server/auth/internal/logic/useremailconfirmationlogic.go +++ b/server/auth/internal/logic/useremailconfirmationlogic.go @@ -1,9 +1,10 @@ package logic import ( + "fmt" "fusenapi/utils/auth" "fusenapi/utils/basic" - "log" + "fusenapi/utils/wevent" "time" "context" @@ -11,6 +12,7 @@ import ( "fusenapi/server/auth/internal/svc" "fusenapi/server/auth/internal/types" + "github.com/474420502/requests" "github.com/zeromicro/go-zero/core/logx" ) @@ -53,7 +55,7 @@ func (l *UserEmailConfirmationLogic) UserEmailConfirmation(req *types.RequestEma // 谷歌平台的注册流程 user, err := l.svcCtx.AllModels.FsUser.RegisterByGoogleOAuth(l.ctx, token) if err != nil { - logx.Error(err) + logx.Error(err, token.TraceId) return resp.SetStatus(basic.CodeDbSqlErr) } @@ -67,15 +69,75 @@ func (l *UserEmailConfirmationLogic) UserEmailConfirmation(req *types.RequestEma ) if err != nil { - logx.Error(err) + logx.Error(err, token.TraceId) return } - log.Println(jwtToken) // 通过websocket去, 送回通道 + event := wevent.NewWebsocketEventSuccess(wevent.UserEmailRegister, token.TraceId) + event.Data = wevent.DataEmailRegister{ + JwtToken: jwtToken, + } + + tp := requests.Post(fmt.Sprintf("%s/api/websocket/common_notify", l.svcCtx.Config.MainAddress)) + tp.SetBodyJson(requests.M{ + "wid": token.Wid, + "data": event, + }) + + wresp, err := tp.Execute() + if err != nil { + logx.Error(err, token.TraceId) + return + } + + result := wresp.Json() + if result.Get("code").Int() != 200 { + logx.Error(result.Get("message")) + return + } + logx.Info("success", token.TraceId, jwtToken) case "facebook": case "fusen": + user, err := l.svcCtx.AllModels.FsUser.RegisterByFusen(l.ctx, token) + if err != nil { + logx.Error(err, token.TraceId) + return resp.SetStatus(basic.CodeDbSqlErr) + } + + // 创建签证 + jwtToken, err := auth.GenerateJwtTokenUint64( + auth.StringToHash(*user.PasswordHash), + l.svcCtx.Config.Auth.AccessExpire, + time.Now().Unix(), + user.Id, + 0, + ) + + if err != nil { + logx.Error(err, token.TraceId) + return + } + + event := wevent.NewWebsocketEventSuccess(wevent.UserEmailRegister, token.TraceId) + event.Data = wevent.DataEmailRegister{ + JwtToken: jwtToken, + } + tp := requests.Post(fmt.Sprintf("%s/api/websocket/common_notify", l.svcCtx.Config.MainAddress)) + tp.SetBodyJson(requests.M{ + "wid": token.Wid, + "data": event, + }) + wresp, err := tp.Execute() + if err != nil { + logx.Error(err, token.TraceId) + } + result := wresp.Json() + if result.Get("code").Int() != 200 { + logx.Error(result.Get("message")) + } + logx.Info("success", token.TraceId, jwtToken) } default: diff --git a/server/auth/internal/logic/usergoogleloginlogic.go b/server/auth/internal/logic/usergoogleloginlogic.go index d84bc20f..8da743ab 100644 --- a/server/auth/internal/logic/usergoogleloginlogic.go +++ b/server/auth/internal/logic/usergoogleloginlogic.go @@ -17,6 +17,7 @@ import ( "fusenapi/server/auth/internal/types" "github.com/474420502/requests" + "github.com/google/uuid" "github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/rest/httpx" "golang.org/x/oauth2" @@ -91,9 +92,10 @@ func (l *UserGoogleLoginLogic) UserGoogleLogin(req *types.RequestGoogleLogin, us l.registerInfo = &auth.RegisterToken{ Id: googleId, - Password: base64.URLEncoding.EncodeToString(nonce), + Password: base64.RawURLEncoding.EncodeToString(nonce), Platform: "google", OperateType: auth.OpTypeRegister, + TraceId: uuid.NewString(), CreateAt: time.Now(), } diff --git a/server/auth/internal/logic/userregisterlogic.go b/server/auth/internal/logic/userregisterlogic.go index 9d61b50d..12b87530 100644 --- a/server/auth/internal/logic/userregisterlogic.go +++ b/server/auth/internal/logic/userregisterlogic.go @@ -10,6 +10,7 @@ import ( "fusenapi/server/auth/internal/svc" "fusenapi/server/auth/internal/types" + "github.com/google/uuid" "github.com/zeromicro/go-zero/core/logx" ) @@ -48,6 +49,7 @@ func (l *UserRegisterLogic) UserRegister(req *types.RequestUserRegister, userinf Email: req.Email, Password: req.Password, Platform: "fusen", + TraceId: uuid.NewString(), CreateAt: time.Now(), } diff --git a/server/auth/internal/logic/userresettokenlogic.go b/server/auth/internal/logic/userresettokenlogic.go index 680fbdf6..aeffc953 100644 --- a/server/auth/internal/logic/userresettokenlogic.go +++ b/server/auth/internal/logic/userresettokenlogic.go @@ -10,6 +10,7 @@ import ( "fusenapi/server/auth/internal/svc" "fusenapi/server/auth/internal/types" + "github.com/google/uuid" "github.com/zeromicro/go-zero/core/logx" ) @@ -51,6 +52,7 @@ func (l *UserResetTokenLogic) UserResetToken(req *types.RequestUserResetToken, u Wid: req.Wid, Email: *user.Email, OldPassword: *user.PasswordHash, + TraceId: uuid.NewString(), CreateAt: time.Now(), } diff --git a/server/auth/internal/logic/websocket_test.go b/server/auth/internal/logic/websocket_test.go new file mode 100644 index 00000000..da4fb0e0 --- /dev/null +++ b/server/auth/internal/logic/websocket_test.go @@ -0,0 +1,40 @@ +package logic + +import ( + "fmt" + "fusenapi/utils/wevent" + "log" + "testing" + + "github.com/474420502/requests" + "github.com/google/uuid" + "github.com/zeromicro/go-zero/core/logx" +) + +func TestSender(t *testing.T) { + traceId := uuid.NewString() + event := wevent.NewWebsocketEventSuccess(wevent.UserEmailRegister, traceId) + event.Data = wevent.DataEmailRegister{ + JwtToken: traceId, + } + + tp := requests.Post(fmt.Sprintf("%s/api/websocket/common_notify", "https://server.fusen.3718.cn:9900")) + tp.SetBodyJson(requests.M{ + "wid": "tGyMYX9EldtsPLZTyT6PxrRgEV615CQGEiu9Sb1XrjZ4kpTjI46sQyh7kYfVlgN9uR5Uw4KDF+S62IknmaRgSMdee1QHVtCv+VEKrMF76snR04zS1ZbWZCgX5Lv2xgHz/bZBWwJF/9u6YTy2/FetGg==", + "data": event, + }) + + wresp, err := tp.Execute() + if err != nil { + logx.Error(err, "traceId") + return + } + + result := wresp.Json() + if result.Get("code").Int() != 200 { + logx.Error(result.Get("message")) + return + } + log.Println(result) + +} diff --git a/server/websocket/internal/logic/commonnotifylogic.go b/server/websocket/internal/logic/commonnotifylogic.go index 7d13abac..2e5e6e65 100644 --- a/server/websocket/internal/logic/commonnotifylogic.go +++ b/server/websocket/internal/logic/commonnotifylogic.go @@ -5,6 +5,8 @@ import ( "fusenapi/constants" "fusenapi/utils/auth" "fusenapi/utils/basic" + "sync" + "time" "fusenapi/server/websocket/internal/svc" "fusenapi/server/websocket/internal/types" @@ -26,6 +28,68 @@ func NewCommonNotifyLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Comm } } +// 定义公共回调未找到websocket连接时暂存数据缓冲队列 +var commonConnectionNotFoundDataCacheChan = make(chan commonConnectionNotFoundDataCacheChanItem, 2000) + +type commonConnectionNotFoundDataCacheChanItem struct { + retryTimes int //重回队列次数 + data types.CommonNotifyReq //数据 +} + +// 放入缓冲队列 +func (l *CommonNotifyLogic) pushCommonCache(data commonConnectionNotFoundDataCacheChanItem) { + select { + case commonConnectionNotFoundDataCacheChan <- data: + return + case <-time.After(time.Millisecond * 50): //超50ms就丢弃 + return + } +} + +// 取出元素 +func (l *CommonNotifyLogic) popCommonCache() (data commonConnectionNotFoundDataCacheChanItem) { + return <-commonConnectionNotFoundDataCacheChan +} + +// 保证处理消息就一个循环在执行 +var consumeCommonCacheData sync.Once + +// 消费公共通知未处理的消息 +func (l *CommonNotifyLogic) consumeCommonCacheData() { + //单例 + consumeCommonCacheData.Do(func() { + tick := time.Tick(time.Millisecond * 200) + for { + select { + case <-tick: //200毫秒触发一次 + info := l.popCommonCache() + //查询websocket连接 + value, ok := mapConnPool.Load(info.data.Wid) + //没有连接 + if !ok { + info.retryTimes-- + //大于0,则放回队列 + if info.retryTimes > 0 { + l.pushCommonCache(info) + continue + } + //否则直接丢弃消息 + continue + } + //断言连接 + ws, ok := value.(wsConnectItem) + if !ok { + logx.Error("渲染回调断言websocket连接失败") + continue + } + //发送 + ws.sendToOutChan(ws.respondDataFormat(constants.WEBSOCKET_COMMON_NOTIFY, info.data.Data)) + } + + } + }) +} + // 处理进入前逻辑w,r // func (l *CommonNotifyLogic) BeforeLogic(w http.ResponseWriter, r *http.Request) { // } @@ -35,10 +99,20 @@ func (l *CommonNotifyLogic) CommonNotify(req *types.CommonNotifyReq, userinfo *a if req.Wid == "" { return resp.SetStatusWithMessage(basic.CodeRequestParamsErr, "websocket connect id is empty") } + //触发消费公共未处理的消息(该方法是单例) + go l.consumeCommonCacheData() //查询websocket连接 value, ok := mapConnPool.Load(req.Wid) if !ok { - return resp.SetStatusWithMessage(basic.CodeOK, "success,but connection is not found") + //没找到连接就放到公共缓冲队列 + go l.pushCommonCache(commonConnectionNotFoundDataCacheChanItem{ + retryTimes: 20, //重试20次 + data: types.CommonNotifyReq{ + Wid: req.Wid, + Data: req.Data, + }, + }) + return resp.SetStatusWithMessage(basic.CodeOK, "success") } //断言连接 ws, ok := value.(wsConnectItem) diff --git a/server/websocket/internal/logic/datatransferlogic.go b/server/websocket/internal/logic/datatransferlogic.go index 5cc978c2..02071c5c 100644 --- a/server/websocket/internal/logic/datatransferlogic.go +++ b/server/websocket/internal/logic/datatransferlogic.go @@ -5,6 +5,7 @@ import ( "bytes" "encoding/hex" "encoding/json" + "errors" "fusenapi/constants" "fusenapi/utils/auth" "fusenapi/utils/encryption_decryption" @@ -138,8 +139,8 @@ func (l *DataTransferLogic) DataTransfer(w http.ResponseWriter, r *http.Request) // 设置连接 func (l *DataTransferLogic) setConnPool(conn *websocket.Conn, userInfo auth.UserInfo, isFirefoxBrowser bool) (wsConnectItem, error) { - //生成连接唯一标识 - uniqueId, err := l.getUniqueId(userInfo) + //生成连接唯一标识(失败重试10次) + uniqueId, err := l.getUniqueId(userInfo, 10) if err != nil { //发送获取唯一标识失败的消息 l.sendGetUniqueIdErrResponse(conn) @@ -170,11 +171,15 @@ func (l *DataTransferLogic) setConnPool(conn *websocket.Conn, userInfo auth.User } // 获取唯一id -func (l *DataTransferLogic) getUniqueId(userInfo auth.UserInfo) (uniqueId string, err error) { +func (l *DataTransferLogic) getUniqueId(userInfo auth.UserInfo, retryTimes int) (uniqueId string, err error) { + if retryTimes < 0 { + return "", errors.New("failed to get unique id") + } //后面拼接上用户id uniqueId = hex.EncodeToString([]byte(uuid.New().String())) + getUserJoinPart(userInfo.UserId, userInfo.GuestId) + //存在则从新获取 if _, ok := mapConnPool.Load(uniqueId); ok { - uniqueId, err = l.getUniqueId(userInfo) + uniqueId, err = l.getUniqueId(userInfo, retryTimes-1) if err != nil { return "", err } @@ -204,6 +209,7 @@ func (l *DataTransferLogic) checkAuth(r *http.Request) (isAuth bool, userInfo *a logx.Error(err) return false, nil } + //todo 对接登录后要删除 userInfo.UserId = 39 userInfo.GuestId = 0 return true, userInfo diff --git a/server/websocket/internal/logic/ws_render_image_logic.go b/server/websocket/internal/logic/ws_render_image.go similarity index 95% rename from server/websocket/internal/logic/ws_render_image_logic.go rename to server/websocket/internal/logic/ws_render_image.go index f860adda..4ac0f8f5 100644 --- a/server/websocket/internal/logic/ws_render_image_logic.go +++ b/server/websocket/internal/logic/ws_render_image.go @@ -165,10 +165,14 @@ func (w *wsConnectItem) consumeRenderCache(data []byte) { } else { //返回给客户端 b := w.respondDataFormat(constants.WEBSOCKET_RENDER_IMAGE, websocket_data.RenderImageRspMsg{ - RenderId: renderImageData.RenderId, - Image: *resource.ResourceUrl, - CombineTakesTime: "cache", - UnityRenderTakesTime: "cache", + RenderId: renderImageData.RenderId, + Image: *resource.ResourceUrl, + RenderProcessTime: websocket_data.RenderProcessTime{ + CombineTakesTime: "cache", + UnityRenderTakesTime: "cache", + UploadCombineImageTakesTime: "cache", + UploadUnityRenderImageTakesTime: "cache", + }, }) //发送数据到out chan w.sendToOutChan(b) @@ -228,7 +232,7 @@ func (w *wsConnectItem) assembleRenderData(taskId string, info websocket_data.Re } res, err := w.logic.svcCtx.Repositories.ImageHandle.LogoCombine(w.logic.ctx, &combineReq) if err != nil { - w.renderErrResponse(info.RenderId, info.RenderData.TemplateTag, taskId, "failed to combine image", w.userId, w.guestId, productTemplate.Id, model3dInfo.Id, productFirstSize.Id) + w.renderErrResponse(info.RenderId, info.RenderData.TemplateTag, taskId, "failed to combine image:"+err.Error(), w.userId, w.guestId, productTemplate.Id, model3dInfo.Id, productFirstSize.Id) logx.Error("合成刀版图失败,合成请求数据:", combineReq, "错误信息:", err) return err } @@ -438,12 +442,14 @@ func (w *wsConnectItem) operationRenderTask() { } //发送到出口 w.sendToOutChan(w.respondDataFormat(constants.WEBSOCKET_RENDER_IMAGE, websocket_data.RenderImageRspMsg{ - RenderId: taskData.RenderId, - Image: data.RenderNotifyImageUrl, - CombineTakesTime: CombineTakesTime, - UnityRenderTakesTime: UnityRenderTakesTime, - UploadCombineImageTakesTime: uploadCombineImageTakesTime, - UploadUnityRenderImageTakesTime: uploadUnityRenderImageTakesTime, + RenderId: taskData.RenderId, + Image: data.RenderNotifyImageUrl, + RenderProcessTime: websocket_data.RenderProcessTime{ + CombineTakesTime: CombineTakesTime, + UnityRenderTakesTime: UnityRenderTakesTime, + UploadCombineImageTakesTime: uploadCombineImageTakesTime, + UploadUnityRenderImageTakesTime: uploadUnityRenderImageTakesTime, + }, })) } //删除任务 diff --git a/server/websocket/internal/logic/ws_reuse_last_connect.go b/server/websocket/internal/logic/ws_reuse_last_connect.go index 5a6d9a44..bab5d61b 100644 --- a/server/websocket/internal/logic/ws_reuse_last_connect.go +++ b/server/websocket/internal/logic/ws_reuse_last_connect.go @@ -53,6 +53,7 @@ func (w *wsConnectItem) reuseLastConnect(data []byte) { } //重新绑定 w.uniqueId = wid + mapConnPool.Store(wid, *w) rsp := w.respondDataFormat(constants.WEBSOCKET_CONNECT_SUCCESS, wid) w.sendToOutChan(rsp) return diff --git a/utils/auth/confirmation_link_test.go b/utils/auth/confirmation_link_test.go index 12e31e5b..a34fc6bc 100644 --- a/utils/auth/confirmation_link_test.go +++ b/utils/auth/confirmation_link_test.go @@ -25,7 +25,7 @@ func BenchmarkConfirmationLink(b *testing.B) { cl := NewConfirmationLink[Register](key, "http://localhost:9900/api/auth/oauth2/register") for i := 0; i < b.N; i++ { - uri, _ := cl.Generate(&Register{Id: 39, Password: "21dsadsad", platform: "google", Expired: time.Now()}) + uri, _ := cl.Generate(&Register{Id: 39, Password: "21dsadsad", platform: "google", Expired: time.Now().UTC()}) u, _ := url.Parse(uri) token := u.Query()["token"] cl.Decrypt(token[0]) @@ -125,7 +125,7 @@ func TestConfirmationLink(t *testing.T) { key := "21321321" cl := NewConfirmationLink[Register](key, "http://localhost:9900/api/auth/oauth2/register") - uri, _ := cl.Generate(&Register{Id: 39, Password: "21dsadsad", platform: "google", Expired: time.Now()}) + uri, _ := cl.Generate(&Register{Id: 39, Password: "21dsadsad", platform: "google", Expired: time.Now().UTC()}) log.Println(uri) u, _ := url.Parse(uri) diff --git a/utils/auth/jwt_token.go b/utils/auth/jwt_token.go index 0558229c..c5c99252 100644 --- a/utils/auth/jwt_token.go +++ b/utils/auth/jwt_token.go @@ -5,14 +5,17 @@ import ( "encoding/json" "errors" "fmt" + "log" "net/http" "strings" ) +var DefaultJwtSecret uint64 = 21321321321 + func ParseJwtTokenHeader[T any](r *http.Request) (string, *T, error) { //TODO: - var u T - return "", &u, nil + // var u T + // return "", &u, nil AuthKey := r.Header.Get("Authorization") if AuthKey == "" { @@ -28,7 +31,51 @@ func ParseJwtTokenHeader[T any](r *http.Request) (string, *T, error) { return "", nil, fmt.Errorf("Invalid JWT token") } - payload, err := base64.URLEncoding.DecodeString(parts[1]) + payload, err := base64.RawURLEncoding.DecodeString(parts[1]) + if err != nil { + return "", nil, fmt.Errorf("Error unmarshalling JWT DecodeString: %s", err.Error()) + } + + var p T + err = json.Unmarshal(payload, &p) + if err != nil { + return "", nil, fmt.Errorf("Error unmarshalling JWT payload: %s", err) + } + + return AuthKey, &p, nil + + // token, err := jwt.Parse(AuthKey, func(token *jwt.Token) (interface{}, error) { + // // 检查签名方法是否为 HS256 + // if _, ok := token.Method.(*jwt.SigningMethodHMAC); !ok { + // return nil, fmt.Errorf("unexpected signing method: %v", token.Header["alg"]) + // } + // // 返回用于验证签名的密钥 + // return []byte(svcCtx.Config.Auth.AccessSecret), nil + // }) + // if err != nil { + // return nil, errors.New(fmt.Sprint("Error parsing token:", err)) + // } + + // // 验证成功返回 + // if claims, ok := token.Claims.(jwt.MapClaims); ok && token.Valid { + // return claims, nil + // } + + // return nil, errors.New(fmt.Sprint("Invalid token", err)) +} + +func TParseJwtTokenHeader[T any](AuthKey string) (string, *T, error) { + //TODO: + // var u T + // return "", &u, nil + + parts := strings.Split(AuthKey, ".") + if len(parts) != 3 { + return "", nil, fmt.Errorf("Invalid JWT token") + } + + payload, err := base64.RawURLEncoding.DecodeString(parts[1]) + log.Println(string(payload)) if err != nil { return "", nil, fmt.Errorf("Error unmarshalling JWT DecodeString: %s", err.Error()) } diff --git a/utils/auth/register.go b/utils/auth/register.go index bd69f8d1..9857abb4 100644 --- a/utils/auth/register.go +++ b/utils/auth/register.go @@ -20,6 +20,7 @@ type RegisterToken struct { Email string // email Password string // 密码 Platform string // 平台 + TraceId string //链路Id CreateAt time.Time // 创建时间 } @@ -29,6 +30,7 @@ type ResetToken struct { Wid string // websocket 通道id Email string // email OldPassword string // 旧密码 + TraceId string //链路Id CreateAt time.Time // 创建时间 } diff --git a/utils/auth/user.go b/utils/auth/user.go index 14b676e1..aaa7c357 100644 --- a/utils/auth/user.go +++ b/utils/auth/user.go @@ -216,7 +216,7 @@ func getJwtClaims(AuthKey string, AccessSecret *string) (jwt.MapClaims, error) { func PasswordHash(pwd string) string { h := sha256.New() h.Write([]byte(pwd)) - return base64.URLEncoding.EncodeToString(h.Sum(nil)) + return base64.RawURLEncoding.EncodeToString(h.Sum(nil)) } func CheckValueRange[T comparable](v T, rangevalues ...T) bool { diff --git a/utils/auth/user_test.go b/utils/auth/user_test.go index c6e11655..d04bc48e 100644 --- a/utils/auth/user_test.go +++ b/utils/auth/user_test.go @@ -11,7 +11,7 @@ import ( // TestGenJwt 测试jwt序列化 func TestGenJwt(t *testing.T) { - now := time.Now().Unix() + now := time.Now().UTC().Unix() secret := "fusen123" a, err := GenerateJwtToken(&secret, 3600, now, 123, 1234) if err != nil { @@ -34,7 +34,7 @@ func TestGenJwt(t *testing.T) { } func TestGenBackendJwt(t *testing.T) { - now := time.Now().Unix() + now := time.Now().UTC().Unix() secret := "fusen_backend_2023" a, err := GenerateBackendJwtToken(&secret, 3600*24*7, now, 1, 1) if err != nil { @@ -60,10 +60,10 @@ func TestCase1(t *testing.T) { a := sha256.New() a.Write([]byte("fusen_backend_3021")) - base64.URLEncoding.EncodeToString(a.Sum(nil)) + base64.RawURLEncoding.EncodeToString(a.Sum(nil)) as := fmt.Sprintf("%x", a.Sum(nil)) - log.Println(as, len(as), base64.URLEncoding.EncodeToString(a.Sum(nil))) + log.Println(as, len(as), base64.RawURLEncoding.EncodeToString(a.Sum(nil))) // b := sha256.New().Sum([]byte("fusen_backend_2022")) // bs := fmt.Sprintf("%x", b) diff --git a/utils/basic/request_parse.go b/utils/basic/request_parse.go index 42151525..12407179 100644 --- a/utils/basic/request_parse.go +++ b/utils/basic/request_parse.go @@ -2,7 +2,9 @@ package basic import ( "errors" + "log" + "fusenapi/shared" "fusenapi/utils/auth" "net/http" "reflect" @@ -10,10 +12,9 @@ import ( "github.com/golang-jwt/jwt" "github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/rest/httpx" + "gorm.io/gorm" ) -var DefaultJwtSecret uint64 = 21321321321 - type IJWTParse interface { ParseJwtToken(r *http.Request) (jwt.MapClaims, error) } @@ -52,6 +53,7 @@ func NormalAfterLogic(w http.ResponseWriter, r *http.Request, resp *Response) { } func RequestParse(w http.ResponseWriter, r *http.Request, svcCtx any, LogicRequest any) (*auth.UserInfo, error) { +<<<<<<< HEAD // log.Println(io.ReadAll(r.Body)) // token, info, err := auth.ParseJwtTokenHeader[auth.UserInfo](r) //解析Token头, 和payload信息 @@ -112,7 +114,73 @@ func RequestParse(w http.ResponseWriter, r *http.Request, svcCtx any, LogicReque // // 白板用户 // userinfo = &auth.UserInfo{UserId: 0, GuestId: 0} // } +======= + var userinfo *auth.UserInfo +>>>>>>> 7a02b3242e440880fd2afaada55a10d50823ee92 var err error + // log.Println(io.ReadAll(r.Body)) + token, info, err := auth.ParseJwtTokenHeader[auth.UserInfo](r) //解析Token头, 和payload信息 + if err != nil { + logx.Error(err) + return nil, err + } + + if token == "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE3MjQyOTM0NjIsImd1ZXN0X2lkIjozNiwiaWF0IjozMTUzNjAwMCwidXNlcl9pZCI6MH0.T7PuRSrinlJu7ZZ1DVqUJLWXhY_6R1lXiUHaNdw35hU" { + userinfo = info + } else { + var secret uint64 = 0 + if info != nil { + + if info.IsUser() { + // us, err := state.GetUserState(info.UserId) //获取缓存的用户状态 + reflect.ValueOf(svcCtx) + ctxValue := reflect.ValueOf(svcCtx).FieldByName("MysqlConn") + gdb := ctxValue.Interface().(*gorm.DB) + + us, err := shared.GetUserState(info.UserId, gdb) + if err != nil { + logx.Error(err) + return nil, err + } + secret = us.PwdHash // 获取密码的hash做jwt, 便于重置密码的使用 + + } else if info.IsGuest() { + secret = auth.DefaultJwtSecret //获取默认的hash + } + } + + if secret != 0 { + claims, err := auth.ParseJwtTokenUint64Secret(token, secret) + // 如果解析JWT token出错,则返回未授权的JSON响应并记录错误消息 + if err != nil { + log.Println(token) + httpx.OkJsonCtx(r.Context(), w, &Response{ + Code: 401, // 返回401状态码,表示未授权 + Message: "unauthorized", // 返回未授权信息 + }) + logx.Info("unauthorized:", err.Error()) // 记录错误日志 + return nil, err + } + + if claims != nil { + // 从token中获取对应的用户信息 + userinfo, err = auth.GetUserInfoFormMapClaims(claims) + // 如果获取用户信息出错,则返回未授权的JSON响应并记录错误消息 + if err != nil { + httpx.OkJsonCtx(r.Context(), w, &Response{ + Code: 401, + Message: "unauthorized", + }) + logx.Info("unauthorized:", err.Error()) + return nil, err + } + } + } else { + // 白板用户 + userinfo = &auth.UserInfo{UserId: 0, GuestId: 0} + } + } + // 如果端点有请求结构体,则使用httpx.Parse方法从HTTP请求体中解析请求数据 if err = httpx.Parse(r, LogicRequest); err != nil { httpx.OkJsonCtx(r.Context(), w, &Response{ @@ -122,7 +190,7 @@ func RequestParse(w http.ResponseWriter, r *http.Request, svcCtx any, LogicReque logx.Error(err) return nil, err } - userinfo := &auth.UserInfo{UserId: 39} + // userinfo := &auth.UserInfo{UserId: 39} return userinfo, err } diff --git a/utils/basic/request_parse_test.go b/utils/basic/request_parse_test.go new file mode 100644 index 00000000..85e7325f --- /dev/null +++ b/utils/basic/request_parse_test.go @@ -0,0 +1,12 @@ +package basic + +import ( + "fusenapi/utils/auth" + "log" + "testing" +) + +func TestRequestParse(t *testing.T) { + a, us, err := auth.TParseJwtTokenHeader[auth.UserInfo]("saTGjruwq7SG4vnQVEo3vsZvbfhzx8zZ3zWA+8nWVdid5tssnYQNECiP+pYCK6YhZ+LRH8m7f7JXrgyqtpYQMOhVOcNWTYAClk0Jnft6+QIPegzY9+v4k7eVMiWf5c/x") + log.Println(a, us, err) +} diff --git a/utils/encryption_decryption/aes_crt.go b/utils/encryption_decryption/aes_crt.go index 0720ccd0..7e612d52 100644 --- a/utils/encryption_decryption/aes_crt.go +++ b/utils/encryption_decryption/aes_crt.go @@ -27,7 +27,7 @@ func NewSecretCRT[T any](key string, iv string) *SecretCRT[T] { s := &SecretCRT[T]{ derivationKey: DerivationKeyV1, iv: []byte(iv), - EncDec: base64.URLEncoding, + EncDec: base64.RawURLEncoding, } s.secretKey = s.derivationKey(key) return s diff --git a/utils/encryption_decryption/aes_gcm.go b/utils/encryption_decryption/aes_gcm.go index 25bee8b7..45e2dc00 100644 --- a/utils/encryption_decryption/aes_gcm.go +++ b/utils/encryption_decryption/aes_gcm.go @@ -49,7 +49,7 @@ func NewSecretGCM[T any](key string) *SecretGCM[T] { s := &SecretGCM[T]{ srcKey: key, derivationKey: DerivationKeyV1, - EncDec: base64.URLEncoding, + EncDec: base64.RawURLEncoding, } s.secretKey = s.derivationKey(s.srcKey) return s diff --git a/utils/id_generator/unique_number.go b/utils/id_generator/unique_number.go index 59edda26..a1411418 100644 --- a/utils/id_generator/unique_number.go +++ b/utils/id_generator/unique_number.go @@ -8,6 +8,6 @@ import ( ) func GenSnNum() string { - a := fmt.Sprintf("%s%.8d", time.Now().Format("20060102150405.000"), rand.Intn(1000000)) + a := fmt.Sprintf("%s%.8d", time.Now().UTC().Format("20060102150405.000"), rand.Intn(1000000)) return strings.ReplaceAll(a, ".", "") } diff --git a/utils/image/image_size.go b/utils/image/image_size.go index a502e571..71659f56 100644 --- a/utils/image/image_size.go +++ b/utils/image/image_size.go @@ -60,7 +60,7 @@ func ThousandFaceImageFormat(req *ThousandFaceImageFormatReq) { req.Cover = "" req.CoverDefault = req.CoverImg if req.Size >= 200 && len(coverSlice) >= 2 && len(coverImgSlice) >= 2 { - req.CoverImg = fmt.Sprintf("%s/test/%d/%d_%d.png?%d", constants.DOMAIN_RENDER_IMG_NAME, req.UserId, req.UserId, req.ProductId, time.Now().Unix()) + req.CoverImg = fmt.Sprintf("%s/test/%d/%d_%d.png?%d", constants.DOMAIN_RENDER_IMG_NAME, req.UserId, req.UserId, req.ProductId, time.Now().UTC().Unix()) req.CoverDefault = fmt.Sprintf("%s_%d.%s", coverImgSlice[0], req.Size, coverImgSlice[1]) } } diff --git a/utils/websocket_data/render_data.go b/utils/websocket_data/render_data.go index 0a88f86b..e6ddfb93 100644 --- a/utils/websocket_data/render_data.go +++ b/utils/websocket_data/render_data.go @@ -27,14 +27,13 @@ type RenderData struct { // websocket发送渲染完的数据 type RenderImageRspMsg struct { - RenderId string `json:"render_id"` //渲染id - Image string `json:"image"` //渲染结果图片 + RenderId string `json:"render_id"` //渲染id + Image string `json:"image"` //渲染结果图片 + RenderProcessTime RenderProcessTime `json:"render_process_time"` //流程耗时 +} +type RenderProcessTime struct { CombineTakesTime string `json:"combine_takes_time"` //合图需要时间 UnityRenderTakesTime string `json:"unity_render_takes_time"` //unity渲染用时 UploadCombineImageTakesTime string `json:"upload_combine_image_takes_time"` //上传刀版图耗时 UploadUnityRenderImageTakesTime string `json:"upload_unity_render_image_takes_time"` //上传unity渲染结果图时间 } -type ThirdPartyLoginRspMsg struct { - //websocket三方登录的通知数据 - Token string `json:"token"` -} diff --git a/utils/wevent/base_event.go b/utils/wevent/base_event.go new file mode 100644 index 00000000..5bf4e6fd --- /dev/null +++ b/utils/wevent/base_event.go @@ -0,0 +1,67 @@ +package wevent + +import ( + "time" +) + +// 和前端交流的事件机制 +type EventType string + +const ( + UserEmailRegister EventType = "E_USER_EMAIL_REGISTER" // 用户注册 +) + +// WebsocketEvent 所有事件都必须继承这个结构体 +type WebsocketEvent struct { + Type EventType `json:"event_type"` // 事件 + SenderTime time.Time `json:"sender_time"` // 发送的时间, 可能用来统计时间 + TraceId string `json:"trace_id"` // 链路ID + Code int `json:"code"` // 状态码 + Message string `json:"msg"` // 描述 + Data any `json:"data"` // 关注的数据 +} + +// NewWebsocketEvent 创建一个Websocket事件 +func NewWebsocketEvent(etype EventType, TraceId string) *WebsocketEvent { + return &WebsocketEvent{ + Type: etype, + SenderTime: time.Now().UTC(), + TraceId: TraceId, + } +} + +// NewWebsocketEventSuccess 创建一个Websocket事件伴随Code(200) +func NewWebsocketEventSuccess(etype EventType, TraceId string) *WebsocketEvent { + return &WebsocketEvent{ + Type: etype, + SenderTime: time.Now().UTC(), + TraceId: TraceId, + Code: 200, + Message: "success", + } +} + +func (event *WebsocketEvent) WithData(data any) *WebsocketEvent { + event.Data = data + return event +} + +func (event *WebsocketEvent) WithMessgae(msg string) *WebsocketEvent { + event.Message = msg + return event +} + +func (event *WebsocketEvent) WithCode(code int) *WebsocketEvent { + event.Code = code + return event +} + +func (event *WebsocketEvent) WithTraceId(traceId string) *WebsocketEvent { + event.TraceId = traceId + return event +} + +func (event *WebsocketEvent) WithSenderTime(senderTime time.Time) *WebsocketEvent { + event.SenderTime = senderTime + return event +} diff --git a/utils/wevent/event.go b/utils/wevent/event.go new file mode 100644 index 00000000..201324a7 --- /dev/null +++ b/utils/wevent/event.go @@ -0,0 +1,6 @@ +package wevent + +// 用户注册成功的事件关注的数据 +type DataEmailRegister struct { + JwtToken string `json:"token"` // 注册成功的事件码 +} diff --git a/utils/wevent/event_test.go b/utils/wevent/event_test.go new file mode 100644 index 00000000..8e8e8576 --- /dev/null +++ b/utils/wevent/event_test.go @@ -0,0 +1,14 @@ +package wevent + +import ( + "encoding/json" + "log" + "testing" + + "github.com/google/uuid" +) + +func TestEvent(t *testing.T) { + data, _ := json.Marshal(NewWebsocketEvent(UserEmailRegister, uuid.NewString()).WithCode(200)) + log.Println(string(data)) +}