上传完善

This commit is contained in:
Hiven 2023-08-02 11:13:28 +08:00
parent dab5320444
commit d75bc8f2be
5 changed files with 186 additions and 67 deletions

View File

@ -1,9 +1,12 @@
package logic package logic
import ( import (
"encoding/json"
"fmt"
"fusenapi/model/gmodel" "fusenapi/model/gmodel"
"fusenapi/utils/auth" "fusenapi/utils/auth"
"fusenapi/utils/basic" "fusenapi/utils/basic"
"fusenapi/utils/hash"
"io" "io"
"net/http" "net/http"
"time" "time"
@ -54,6 +57,7 @@ func (l *UploadFilesBackendLogic) UploadFilesBackend(req *types.UploadFilesReq,
} }
// 定义用户ID和S3键名格式 // 定义用户ID和S3键名格式
var uid int64
var userId int64 var userId int64
var guestId int64 var guestId int64
@ -61,16 +65,21 @@ func (l *UploadFilesBackendLogic) UploadFilesBackend(req *types.UploadFilesReq,
if userinfo.IsGuest() { if userinfo.IsGuest() {
// 如果是使用游客ID和游客键名格式 // 如果是使用游客ID和游客键名格式
guestId = userinfo.GuestId guestId = userinfo.GuestId
uid = guestId
} else { } else {
// 否则使用用户ID和用户键名格式 // 否则使用用户ID和用户键名格式
userId = userinfo.UserId userId = userinfo.UserId
uid = userId
} }
var aa = make([]types.UploadInfo, 2) var uploadInfoList []UploadInfo
aa[0].FileKeys = "202308011632" err := json.Unmarshal([]byte(req.UploadInfo), &uploadInfoList)
aa[1].FileKeys = "202308011633" if err != nil {
req.UploadInfo = aa logx.Error(err)
var fileLen = len(req.UploadInfo) return resp.SetStatus(basic.CodeFileUploadErr, "file upload err,params Unmarshal failed")
}
var fileLen = len(uploadInfoList)
if fileLen == 0 { if fileLen == 0 {
return resp.SetStatus(basic.CodeFileUploadErr, "file upload err,no files") return resp.SetStatus(basic.CodeFileUploadErr, "file upload err,no files")
@ -78,6 +87,18 @@ func (l *UploadFilesBackendLogic) UploadFilesBackend(req *types.UploadFilesReq,
if req.ApiType == 1 && fileLen > 100 { if req.ApiType == 1 && fileLen > 100 {
return resp.SetStatus(basic.CodeFileUploadErr, "file upload err, files count is beyond the maximum") return resp.SetStatus(basic.CodeFileUploadErr, "file upload err, files count is beyond the maximum")
} }
// 定义存储桶名称
var bucketName *string
// 根据类别选择存储桶
switch req.UploadBucket {
case 2:
bucketName = basic.TempfileBucketName
default:
bucketName = basic.StorageBucketName
}
//设置内存大小 //设置内存大小
l.r.ParseMultipartForm(32 << 20) l.r.ParseMultipartForm(32 << 20)
@ -93,10 +114,9 @@ func (l *UploadFilesBackendLogic) UploadFilesBackend(req *types.UploadFilesReq,
// 定义S3请求和当前时间 // 定义S3请求和当前时间
var s3req *request.Request var s3req *request.Request
var uploadBucket = req.UploadBucket
resourceModel := gmodel.NewFsResourceModel(l.svcCtx.MysqlConn) resourceModel := gmodel.NewFsResourceModel(l.svcCtx.MysqlConn)
result, err := mr.MapReduce(func(source chan<- interface{}) { result, err := mr.MapReduce(func(source chan<- interface{}) {
for i, info := range req.UploadInfo { for i, info := range uploadInfoList {
fileType := files[i].Header.Get("Content-Type") fileType := files[i].Header.Get("Content-Type")
// 打开文件 // 打开文件
file, err := files[i].Open() file, err := files[i].Open()
@ -111,34 +131,36 @@ func (l *UploadFilesBackendLogic) UploadFilesBackend(req *types.UploadFilesReq,
} }
// 一系列业务逻辑....验证类型,文件大小 // 一系列业务逻辑....验证类型,文件大小
var fileKey string = info.FileKeys
source <- uploadData{ var hashKey string = hash.JsonHashKey(fmt.Sprintf("%s%d", info.FileKeys, uid))
FileKey: fileKey, source <- UploadData{
FileKey: info.FileKeys,
FileType: fileType, FileType: fileType,
Metadata: info.Metadata, Metadata: info.Metadata,
FileData: ioData, FileData: ioData,
ApiType: req.ApiType, ApiType: req.ApiType,
Bucket: uploadBucket, Bucket: bucketName,
HashKey: hashKey,
} }
} }
}, func(item interface{}, writer mr.Writer[interface{}], cancel func(error)) { }, func(item interface{}, writer mr.Writer[interface{}], cancel func(error)) {
var uploadUrl = uploadUrl{} var uploadUrl = UploadUrl{}
uploadDataInfo := item.(uploadData) uploadDataInfo := item.(UploadData)
var resourceId string = uploadDataInfo.FileKey var resourceId string = uploadDataInfo.HashKey
// 查询数据库 // 查询数据库
resourceInfo, err := resourceModel.FindOneById(l.ctx, resourceId) resourceInfo, err := resourceModel.FindOneById(l.ctx, resourceId)
if err == nil && resourceInfo.ResourceId != "" { if err == nil && resourceInfo.ResourceId != "" {
uploadUrl.Status = 1 uploadUrl.Status = 1
uploadUrl.ResourceId = resourceId uploadUrl.ResourceId = resourceId
uploadUrl.Url = *resourceInfo.ResourceUrl uploadUrl.Url = *resourceInfo.ResourceUrl
uploadUrl.Key = resourceId uploadUrl.Key = uploadDataInfo.FileKey
} else { } else {
// 创建S3对象存储请求 // 创建S3对象存储请求
s3req, _ = svc.PutObjectRequest( s3req, _ = svc.PutObjectRequest(
&s3.PutObjectInput{ &s3.PutObjectInput{
Bucket: &uploadDataInfo.Bucket, Bucket: uploadDataInfo.Bucket,
Key: &uploadDataInfo.FileKey, Key: &uploadDataInfo.HashKey,
}, },
) )
@ -184,11 +206,11 @@ func (l *UploadFilesBackendLogic) UploadFilesBackend(req *types.UploadFilesReq,
// Notice 这个必须加! // Notice 这个必须加!
writer.Write(uploadUrl) writer.Write(uploadUrl)
}, func(pipe <-chan interface{}, writer mr.Writer[interface{}], cancel func(error)) { }, func(pipe <-chan interface{}, writer mr.Writer[interface{}], cancel func(error)) {
var uploadUrlList = make(map[string][]*uploadUrl) var uploadUrlList = make(map[string][]*UploadUrl)
var uploadUrlListFail []*uploadUrl var uploadUrlListFail []*UploadUrl
var uploadUrlListSuccess []*uploadUrl var uploadUrlListSuccess []*UploadUrl
for p := range pipe { for p := range pipe {
var uploadUrl = p.(uploadUrl) var uploadUrl = p.(UploadUrl)
if uploadUrl.Status == 1 { if uploadUrl.Status == 1 {
uploadUrlListSuccess = append(uploadUrlListSuccess, &uploadUrl) uploadUrlListSuccess = append(uploadUrlListSuccess, &uploadUrl)
} else { } else {
@ -211,17 +233,24 @@ func (l *UploadFilesBackendLogic) UploadFilesBackend(req *types.UploadFilesReq,
}) })
} }
type uploadData struct { type UploadInfo struct {
FileSize int64 `json:"file_size"` // 上传唯一标识信息
FileKeys string `json:"file_keys"` // 上传唯一标识信息
Metadata string `json:"meta_data"` // 上传文件额外信息
}
type UploadData struct {
ApiType int64 `json:"api_type"` ApiType int64 `json:"api_type"`
FileSize int64 `json:"file_size"` FileSize int64 `json:"file_size"`
FileType string `json:"file_type"` FileType string `json:"file_type"`
FileKey string `json:"file_key"` FileKey string `json:"file_key"`
Metadata string `json:"metadata"` Metadata string `json:"metadata"`
Bucket string `json:"bucket"` Bucket *string `json:"bucket"`
HashKey string `json:"hash_key"`
FileData []byte `fsfile:"data"` FileData []byte `fsfile:"data"`
} }
type uploadUrl struct { type UploadUrl struct {
Status int64 `json:"status"` Status int64 `json:"status"`
ResourceId string `json:"resource_id"` ResourceId string `json:"resource_id"`
Url string `json:"url"` Url string `json:"url"`

View File

@ -1,8 +1,11 @@
package logic package logic
import ( import (
"encoding/json"
"fmt"
"fusenapi/utils/auth" "fusenapi/utils/auth"
"fusenapi/utils/basic" "fusenapi/utils/basic"
"fusenapi/utils/hash"
"time" "time"
"context" "context"
@ -42,25 +45,30 @@ func NewUploadFilesFrontendLogic(ctx context.Context, svcCtx *svc.ServiceContext
func (l *UploadFilesFrontendLogic) UploadFilesFrontend(req *types.UploadFilesReq, userinfo *auth.UserInfo) (resp *basic.Response) { func (l *UploadFilesFrontendLogic) UploadFilesFrontend(req *types.UploadFilesReq, userinfo *auth.UserInfo) (resp *basic.Response) {
// 返回值必须调用Set重新返回, resp可以空指针调用 resp.SetStatus(basic.CodeOK, data) // 返回值必须调用Set重新返回, resp可以空指针调用 resp.SetStatus(basic.CodeOK, data)
// userinfo 传入值时, 一定不为null // userinfo 传入值时, 一定不为null
// 定义用户ID和S3键名格式 // 定义用户ID和S3键名格式
// var userId int64 var uid int64
// var guestId int64 var userId int64
var guestId int64
// 检查用户是否是游客 // 检查用户是否是游客
// if userinfo.IsGuest() { if userinfo.IsGuest() {
// // 如果是使用游客ID和游客键名格式 // 如果是使用游客ID和游客键名格式
// guestId = userinfo.GuestId guestId = userinfo.GuestId
// } else { uid = guestId
// // 否则使用用户ID和用户键名格式 } else {
// userId = userinfo.UserId // 否则使用用户ID和用户键名格式
// } userId = userinfo.UserId
uid = userId
}
var aa = make([]types.UploadInfo, 2) var uploadInfoList []UploadInfo
aa[0].FileKeys = "202308011632" err := json.Unmarshal([]byte(req.UploadInfo), &uploadInfoList)
aa[1].FileKeys = "202308011633" if err != nil {
req.UploadInfo = aa logx.Error(err)
var fileLen = len(req.UploadInfo) return resp.SetStatus(basic.CodeFileUploadErr, "file upload err,params Unmarshal failed")
}
var fileLen = len(uploadInfoList)
if fileLen == 0 { if fileLen == 0 {
return resp.SetStatus(basic.CodeFileUploadErr, "file upload err,no files") return resp.SetStatus(basic.CodeFileUploadErr, "file upload err,no files")
@ -69,7 +77,16 @@ func (l *UploadFilesFrontendLogic) UploadFilesFrontend(req *types.UploadFilesReq
return resp.SetStatus(basic.CodeFileUploadErr, "file upload err, files count is beyond the maximum") return resp.SetStatus(basic.CodeFileUploadErr, "file upload err, files count is beyond the maximum")
} }
var uploadBucket = req.UploadBucket // 定义存储桶名称
var bucketName *string
// 根据类别选择存储桶
switch req.UploadBucket {
case 2:
bucketName = basic.TempfileBucketName
default:
bucketName = basic.StorageBucketName
}
// 设置AWS会话的区域 // 设置AWS会话的区域
l.svcCtx.AwsSession.Config.Region = aws.String("us-west-1") l.svcCtx.AwsSession.Config.Region = aws.String("us-west-1")
@ -78,25 +95,26 @@ func (l *UploadFilesFrontendLogic) UploadFilesFrontend(req *types.UploadFilesReq
svc := s3.New(l.svcCtx.AwsSession) svc := s3.New(l.svcCtx.AwsSession)
result, err := mr.MapReduce(func(source chan<- interface{}) { result, err := mr.MapReduce(func(source chan<- interface{}) {
for _, info := range req.UploadInfo { for _, info := range uploadInfoList {
if info.FileSize <= 1024*1024*500 { if info.FileSize <= 1024*1024*500 {
// 一系列业务逻辑....验证类型,文件大小 // 一系列业务逻辑....验证类型,文件大小
var fileKey string = info.FileKeys var hashKey string = hash.JsonHashKey(fmt.Sprintf("%s%d", info.FileKeys, uid))
source <- uploadData{ source <- UploadData{
FileKey: fileKey, FileKey: info.FileKeys,
FileSize: info.FileSize, FileSize: info.FileSize,
Bucket: uploadBucket, Bucket: bucketName,
HashKey: hashKey,
} }
} }
} }
}, func(item interface{}, writer mr.Writer[interface{}], cancel func(error)) { }, func(item interface{}, writer mr.Writer[interface{}], cancel func(error)) {
var uploadUrl = uploadUrl{} var uploadUrl = UploadUrl{}
uploadDataInfo := item.(uploadData) uploadDataInfo := item.(UploadData)
s3req, _ := svc.PutObjectRequest( s3req, _ := svc.PutObjectRequest(
&s3.PutObjectInput{ &s3.PutObjectInput{
Bucket: &uploadBucket, Bucket: uploadDataInfo.Bucket,
Key: &uploadDataInfo.FileKey, Key: &uploadDataInfo.HashKey,
ContentLength: aws.Int64(uploadDataInfo.FileSize), ContentLength: aws.Int64(uploadDataInfo.FileSize),
}, },
) )
@ -106,22 +124,24 @@ func (l *UploadFilesFrontendLogic) UploadFilesFrontend(req *types.UploadFilesReq
logx.Error(err) logx.Error(err)
uploadUrl.Status = 0 uploadUrl.Status = 0
uploadUrl.Url = "" uploadUrl.Url = ""
uploadUrl.ResourceId = uploadDataInfo.HashKey
uploadUrl.Key = uploadDataInfo.FileKey uploadUrl.Key = uploadDataInfo.FileKey
} else { } else {
// 打印请求URL // 打印请求URL
logx.Info(url) logx.Info(url)
uploadUrl.Status = 1 uploadUrl.Status = 1
uploadUrl.Url = url uploadUrl.Url = url
uploadUrl.ResourceId = uploadDataInfo.HashKey
uploadUrl.Key = uploadDataInfo.FileKey uploadUrl.Key = uploadDataInfo.FileKey
} }
// Notice 这个必须加! // Notice 这个必须加!
writer.Write(uploadUrl) writer.Write(uploadUrl)
}, func(pipe <-chan interface{}, writer mr.Writer[interface{}], cancel func(error)) { }, func(pipe <-chan interface{}, writer mr.Writer[interface{}], cancel func(error)) {
var uploadUrlList = make(map[string][]*uploadUrl) var uploadUrlList = make(map[string][]*UploadUrl)
var uploadUrlListFail []*uploadUrl var uploadUrlListFail []*UploadUrl
var uploadUrlListSuccess []*uploadUrl var uploadUrlListSuccess []*UploadUrl
for p := range pipe { for p := range pipe {
var uploadUrl = p.(uploadUrl) var uploadUrl = p.(UploadUrl)
if uploadUrl.Status == 1 { if uploadUrl.Status == 1 {
uploadUrlListSuccess = append(uploadUrlListSuccess, &uploadUrl) uploadUrlListSuccess = append(uploadUrlListSuccess, &uploadUrl)
} else { } else {

View File

@ -8,13 +8,13 @@ import (
type UploadInfo struct { type UploadInfo struct {
FileSize int64 `form:"file_size,optional"` // 上传唯一标识信息 FileSize int64 `form:"file_size,optional"` // 上传唯一标识信息
FileKeys string `form:"file_keys,optional"` // 上传唯一标识信息 FileKeys string `form:"file_keys,optional"` // 上传唯一标识信息
Metadata string `form:"file_keys,optional"` // 上传文件额外信息 Metadata string `form:"meta_data,optional"` // 上传文件额外信息
} }
type UploadFilesReq struct { type UploadFilesReq struct {
ApiType int64 `form:"api_type,options=[1,2],default=1"` // 调用类型1=对外2=对内 ApiType int64 `form:"api_type,options=[1,2],default=1"` // 调用类型1=对外2=对内
UploadBucket string `form:"upload_bucket"` // 上传桶名 UploadBucket int64 `form:"upload_bucket,options=[1,2],default=1"` // 上传桶名:1=缓存,2=持久
UploadInfo []UploadInfo `form:"upload_info,optional"` // 上传信息 UploadInfo string `form:"upload_info"` // 上传信息 json
} }
type UploadCallbackReq struct { type UploadCallbackReq struct {

View File

@ -39,13 +39,13 @@ type (
UploadInfo { UploadInfo {
FileSize int64 `form:"file_size,optional"` // 上传唯一标识信息 FileSize int64 `form:"file_size,optional"` // 上传唯一标识信息
FileKeys string `form:"file_keys,optional"` // 上传唯一标识信息 FileKeys string `form:"file_keys,optional"` // 上传唯一标识信息
Metadata string `form:"file_keys,optional"` // 上传文件额外信息 Metadata string `form:"meta_data,optional"` // 上传文件额外信息
} }
UploadFilesReq { UploadFilesReq {
ApiType int64 `form:"api_type,options=[1,2],default=1"` // 调用类型1=对外2=对内 ApiType int64 `form:"api_type,options=[1,2],default=1"` // 调用类型1=对外2=对内
UploadBucket string `form:"upload_bucket"` // 上传桶名 UploadBucket int64 `form:"upload_bucket,options=[1,2],default=1"` // 上传桶名:1=缓存,2=持久
UploadInfo []UploadInfo `form:"upload_info,optional"` // 上传信息 UploadInfo string `form:"upload_info"` // 上传信息 json
} }
UploadCallbackReq { UploadCallbackReq {
FileType string `form:"file_type"` // 文件类型 / fbx / hdr FileType string `form:"file_type"` // 文件类型 / fbx / hdr

70
utils/hash/hash.go Normal file
View File

@ -0,0 +1,70 @@
package hash
import (
"bytes"
"crypto/sha256"
"encoding/json"
"fmt"
"sort"
)
func JsonHashKey(v interface{}) string {
h := sha256.New()
h.Write(marshalOrdered(v))
return fmt.Sprintf("%x", h.Sum(nil))
}
func marshalOrdered(v interface{}) []byte {
switch v := v.(type) {
case map[string]interface{}:
sortedKeys := make([]string, 0, len(v))
for key := range v {
sortedKeys = append(sortedKeys, key)
}
sort.Strings(sortedKeys)
var buf bytes.Buffer
buf.WriteByte('{')
for i, key := range sortedKeys {
if i > 0 {
buf.WriteByte(',')
}
b, err := json.Marshal(key)
if err != nil {
panic(err)
}
buf.Write(b)
buf.WriteByte(':')
b = marshalOrdered(v[key])
buf.Write(b)
}
buf.WriteByte('}')
return buf.Bytes()
case []interface{}:
var buf bytes.Buffer
sort.Slice(v, func(i, j int) bool {
return bytes.Compare(marshalOrdered(v[i]), marshalOrdered(v[j])) == 1
})
buf.WriteByte('[')
for i, val := range v {
if i > 0 {
buf.WriteByte(',')
}
b := marshalOrdered(val)
buf.Write(b)
}
buf.WriteByte(']')
return buf.Bytes()
default:
b, err := json.Marshal(v)
if err != nil {
panic(err)
}
return b
}
}