diff --git a/constants/orders.go b/constants/orders.go index 070b96ac..34b7ff71 100644 --- a/constants/orders.go +++ b/constants/orders.go @@ -56,7 +56,10 @@ const ( type OrderStatusCode int64 const ( - ORDER_STATUS_UNPAIDDEPOSIT OrderStatusCode = 0 // 0,未支付定金 + ORDER_STATUS_UNPAIDDEPOSIT OrderStatusCode = 0 // 0,未支付定金 + ORDER_STATUS_CLOSE OrderStatusCode = 10 // 10,订单已关闭 + ORDER_STATUS_COMPLETE OrderStatusCode = 20 // 20,订单已完成 + ORDER_STATUS_DIRECTMAIL_ORDERED OrderStatusCode = 10100 // 10100,直邮单--已下单 ORDER_STATUS_DIRECTMAIL_ORDEREDMAINING OrderStatusCode = 10100001 // 10100001,直邮单--已下单--尾款 ORDER_STATUS_DIRECTMAIL_CANCEL OrderStatusCode = 10101 // 10101,直邮单--已取消 @@ -64,13 +67,13 @@ const ( ORDER_STATUS_DIRECTMAIL_COMPLETEPRODUCTION OrderStatusCode = 10300 // 10300,直邮单--生产完成 ORDER_STATUS_DIRECTMAIL_SHIPPED OrderStatusCode = 10400 // 10400,直邮单--已发货 ORDER_STATUS_DIRECTMAIL_ARRIVED OrderStatusCode = 10500 // 10500,直邮单--已到达 + ORDER_STATUS_CLOUDSTORE_ORDERED OrderStatusCode = 20100 // 20100,云仓单--已下单 ORDER_STATUS_CLOUDSTORE_ORDEREDMAINING OrderStatusCode = 20100001 // 20100001,云仓单--已下单-尾款 ORDER_STATUS_CLOUDSTORE_CANCEL OrderStatusCode = 20101 // 20101,云仓单--已取消 ORDER_STATUS_CLOUDSTORE_STARTPRODUCTION OrderStatusCode = 20200 // 20200,云仓单--开始生产 ORDER_STATUS_CLOUDSTORE_COMPLETEPRODUCTION OrderStatusCode = 20300 // 20300,云仓单--生产完成 ORDER_STATUS_CLOUDSTORE_ARRIVEDWAREHOUSE OrderStatusCode = 20400 // 20400,云仓单--直达仓库 - ORDER_STATUS_COMPLETE OrderStatusCode = 30000 // 30000,订单完成 ) // 订单状态名称 @@ -106,6 +109,8 @@ func init() { // 订单状态名称 OrderStatusMessage = make(map[OrderStatusCode]string) OrderStatusMessage[ORDER_STATUS_UNPAIDDEPOSIT] = "未支付定金" + OrderStatusMessage[ORDER_STATUS_CLOSE] = "订单已关闭" + OrderStatusMessage[ORDER_STATUS_COMPLETE] = "订单已完成" OrderStatusMessage[ORDER_STATUS_DIRECTMAIL_ORDERED] = "直邮单--已下单" OrderStatusMessage[ORDER_STATUS_DIRECTMAIL_ORDEREDMAINING] = "直邮单--已下单--尾款" @@ -120,18 +125,16 @@ func init() { OrderStatusMessage[ORDER_STATUS_CLOUDSTORE_COMPLETEPRODUCTION] = "云仓单--生产完成" OrderStatusMessage[ORDER_STATUS_CLOUDSTORE_ARRIVEDWAREHOUSE] = "云仓单--直达仓库" - OrderStatusMessage[ORDER_STATUS_COMPLETE] = "订单完成" - // 订单状态--用户可见--直邮 OrderStatusUserDIRECTMAIL = []OrderStatusCode{ ORDER_STATUS_UNPAIDDEPOSIT, ORDER_STATUS_DIRECTMAIL_ORDERED, ORDER_STATUS_DIRECTMAIL_STARTPRODUCTION, ORDER_STATUS_DIRECTMAIL_COMPLETEPRODUCTION, ORDER_STATUS_DIRECTMAIL_SHIPPED, ORDER_STATUS_DIRECTMAIL_ARRIVED, - ORDER_STATUS_COMPLETE, + ORDER_STATUS_COMPLETE, ORDER_STATUS_CLOSE, } // 订单状态--用户可见--云仓 OrderStatusUserCLOUDSTORE = []OrderStatusCode{ ORDER_STATUS_UNPAIDDEPOSIT, ORDER_STATUS_CLOUDSTORE_ORDERED, ORDER_STATUS_CLOUDSTORE_STARTPRODUCTION, ORDER_STATUS_CLOUDSTORE_COMPLETEPRODUCTION, ORDER_STATUS_CLOUDSTORE_ARRIVEDWAREHOUSE, - ORDER_STATUS_COMPLETE, + ORDER_STATUS_COMPLETE, ORDER_STATUS_CLOSE, } } diff --git a/constants/queue.go b/constants/queue.go new file mode 100644 index 00000000..5d415d4a --- /dev/null +++ b/constants/queue.go @@ -0,0 +1,5 @@ +package constants + +const ( + QUEUE_NAME_ORDER = "queue_order" +) diff --git a/initalize/delayMessage.go b/initalize/delayMessage.go new file mode 100644 index 00000000..a1b7cacb --- /dev/null +++ b/initalize/delayMessage.go @@ -0,0 +1,14 @@ +package initalize + +import ( + "fusenapi/utils/queue" +) + +// 初始化 +func InitDelayMessage() *queue.DelayMessage { + //创建延迟消息 + dm := queue.NewDelayMessage() + + go dm.Start() + return dm +} diff --git a/initalize/service.go b/initalize/service.go index edeee442..5b42b40a 100644 --- a/initalize/service.go +++ b/initalize/service.go @@ -2,6 +2,7 @@ package initalize import ( "fusenapi/service/repositories" + "fusenapi/utils/queue" "github.com/aws/aws-sdk-go/aws/session" "gorm.io/gorm" @@ -18,6 +19,7 @@ type NewAllRepositorieData struct { GormDB *gorm.DB BLMServiceUrl *string AwsSession *session.Session + DelayQueue *queue.DelayMessage } func NewAllRepositories(newData *NewAllRepositorieData) *Repositories { @@ -25,6 +27,6 @@ func NewAllRepositories(newData *NewAllRepositorieData) *Repositories { ImageHandle: repositories.NewImageHandle(newData.GormDB, newData.BLMServiceUrl, newData.AwsSession), NewShoppingCart: repositories.NewShoppingCart(newData.GormDB, newData.BLMServiceUrl, newData.AwsSession), NewResource: repositories.NewResource(newData.GormDB, newData.BLMServiceUrl, newData.AwsSession), - NewOrder: repositories.NewOrder(newData.GormDB, newData.BLMServiceUrl, newData.AwsSession), + NewOrder: repositories.NewOrder(newData.GormDB, newData.BLMServiceUrl, newData.AwsSession, newData.DelayQueue), } } diff --git a/server/order/internal/logic/createorderlogic.go b/server/order/internal/logic/createorderlogic.go index 802ece8d..148078e4 100644 --- a/server/order/internal/logic/createorderlogic.go +++ b/server/order/internal/logic/createorderlogic.go @@ -54,6 +54,20 @@ func (l *CreateOrderLogic) CreateOrder(req *types.CreateOrderReq, userinfo *auth return resp.SetStatus(&res.ErrorCode) } + // 延时任务 + l.svcCtx.DelayQueue.AddTask(time.Now().Add(time.Minute*30), constants.QUEUE_NAME_ORDER, func(args ...interface{}) { + ctx := context.Background() + orderSn := args[0].(string) + svcCtx := svc.ServiceContext{ + Config: l.svcCtx.Config, + Repositories: l.svcCtx.Repositories, + } + svcCtx.Repositories.NewOrder.Close(ctx, &repositories.CloseReq{ + OrderSn: orderSn, + Type: 1, + }) + }, []interface{}{res.OrderSn}) + return resp.SetStatus(basic.CodeOK, map[string]interface{}{ "order_sn": res.OrderSn, }) diff --git a/server/order/internal/svc/servicecontext.go b/server/order/internal/svc/servicecontext.go index af53eddd..74c47177 100644 --- a/server/order/internal/svc/servicecontext.go +++ b/server/order/internal/svc/servicecontext.go @@ -2,6 +2,7 @@ package svc import ( "fusenapi/server/order/internal/config" + "fusenapi/utils/queue" "fusenapi/initalize" "fusenapi/model/gmodel" @@ -15,17 +16,22 @@ type ServiceContext struct { MysqlConn *gorm.DB AllModels *gmodel.AllModelsGen Repositories *initalize.Repositories + DelayQueue *queue.DelayMessage } func NewServiceContext(c config.Config) *ServiceContext { conn := initalize.InitMysql(c.SourceMysql) + delayQueue := initalize.InitDelayMessage() + repositories := initalize.NewAllRepositories(&initalize.NewAllRepositorieData{ + GormDB: conn, + DelayQueue: delayQueue, + }) return &ServiceContext{ - Config: c, - MysqlConn: conn, - AllModels: gmodel.NewAllModels(conn), - Repositories: initalize.NewAllRepositories(&initalize.NewAllRepositorieData{ - GormDB: conn, - }), + Config: c, + MysqlConn: conn, + AllModels: gmodel.NewAllModels(conn), + Repositories: repositories, + DelayQueue: delayQueue, } } diff --git a/server/order/order.go b/server/order/order.go index 5d0508bc..a5fbba21 100644 --- a/server/order/order.go +++ b/server/order/order.go @@ -1,11 +1,13 @@ package main import ( + "context" "flag" "fmt" "net/http" "time" + "fusenapi/service/repositories" "fusenapi/utils/auth" "fusenapi/utils/fsconfig" @@ -32,6 +34,8 @@ func main() { ctx := svc.NewServiceContext(c) handler.RegisterHandlers(server, ctx) + go ctx.Repositories.NewOrder.CloseList(context.Background(), &repositories.CloseListReq{}) + fmt.Printf("Starting server at %s:%d...\n", c.Host, c.Port) server.Start() } diff --git a/service/repositories/order.go b/service/repositories/order.go index d23c170d..7827c381 100644 --- a/service/repositories/order.go +++ b/service/repositories/order.go @@ -11,6 +11,8 @@ import ( "fusenapi/utils/handlers" "fusenapi/utils/order" "fusenapi/utils/pay" + "fusenapi/utils/queue" + "math" "time" "github.com/aws/aws-sdk-go/aws/session" @@ -19,15 +21,17 @@ import ( "gorm.io/gorm" ) -func NewOrder(gormDB *gorm.DB, bLMServiceUrl *string, awsSession *session.Session) Order { +func NewOrder(gormDB *gorm.DB, bLMServiceUrl *string, awsSession *session.Session, delayQueue *queue.DelayMessage) Order { return &defaultOrder{ - MysqlConn: gormDB, + MysqlConn: gormDB, + DelayQueue: delayQueue, } } type ( defaultOrder struct { - MysqlConn *gorm.DB + MysqlConn *gorm.DB + DelayQueue *queue.DelayMessage } Order interface { // 下单 @@ -42,6 +46,11 @@ type ( Detail(ctx context.Context, in *DetailReq) (res *DetailRes, err error) // 支付成功 PaymentSuccessful(ctx context.Context, in *PaymentSuccessfulReq) (res *PaymentSuccessfulRes, err error) + // 关闭 + Close(ctx context.Context, in *CloseReq) (res *CloseRes, err error) + + // 支付超时订单自动关闭 + CloseList(ctx context.Context, in *CloseListReq) (res *CloseListRes, err error) } PayInfo struct { @@ -74,6 +83,22 @@ type ( Amount int64 `json:"amount"` // 金额 Label string `json:"label"` // 标签 } + /* 支付超时订单自动关闭 */ + CloseListReq struct { + Type int64 // type:1=关闭 + } + CloseListRes struct{} + /* 支付超时订单自动关闭 */ + + /* 关闭 */ + CloseReq struct { + Type int64 // type:1=添加购物车 + OrderSn string + } + CloseRes struct { + ErrorCode basic.StatusResponse + } + /* 关闭 */ /* 支付成功 */ PaymentSuccessfulReq struct { @@ -170,6 +195,138 @@ type ( /* 列表 */ ) +// 支付超时订单自动关闭 +func (d *defaultOrder) CloseList(ctx context.Context, in *CloseListReq) (res *CloseListRes, err error) { + var orderList []gmodel.FsOrder + result := d.MysqlConn.Model(&gmodel.FsOrder{}). + Where("is_del = ?", 0). + Where("status = ?", int64(constants.ORDER_STATUS_UNPAIDDEPOSIT)). + Where("pay_status = ?", int64(constants.ORDER_PAY_STATUS_UNPAIDDEPOSIT)). + Find(&orderList) + if result.Error != nil { + logc.Errorf(ctx, "order count failed, err: %v", err) + return nil, result.Error + } + for _, orderInfo := range orderList { + var ntime = time.Now().UTC() + var cptime = orderInfo.Ctime.UTC().Add(time.Minute * 30) + var dtime time.Time + var dd = ntime.Unix() - cptime.Unix() + if dd > 0 { + dtime = time.Now().Add(time.Second * 0) + } else { + dtime = time.Now().Add(time.Second * time.Duration(math.Abs(float64(dd)))) + } + // 延时任务 + d.DelayQueue.AddTask(dtime, constants.QUEUE_NAME_ORDER, func(args ...interface{}) { + ctx := context.Background() + orderSn := args[0].(string) + logc.Infof(ctx, "order close, orderSn: %s", orderSn) + d.Close(ctx, &CloseReq{ + OrderSn: orderSn, + Type: 1, + }) + }, []interface{}{*orderInfo.OrderSn}) + } + return nil, nil +} + +// 关闭 +func (d *defaultOrder) Close(ctx context.Context, in *CloseReq) (res *CloseRes, err error) { + fmt.Println(in) + var errorCode basic.StatusResponse + err = d.MysqlConn.WithContext(ctx).Transaction(func(tx *gorm.DB) error { + var orderInfo gmodel.FsOrder + model := tx.Where("status = ?", int64(constants.ORDER_STATUS_UNPAIDDEPOSIT)).Where("pay_status = ?", int(constants.ORDER_PAY_STATUS_UNPAIDDEPOSIT)) + if in.OrderSn != "" { + model.Where("order_sn = ?", in.OrderSn) + } + result := model.Take(&orderInfo) + if result.Error != nil { + if errors.Is(result.Error, gorm.ErrRecordNotFound) { + errorCode = *basic.CodeErrOrderCreatePrePaymentInfoNoFound + } else { + errorCode = *basic.CodeServiceErr + } + logc.Errorf(ctx, "order close failed, err: %v", err) + return result.Error + } + ress, err := d.OrderDetailHandler(ctx, &orderInfo, 0) + if err != nil { + logc.Errorf(ctx, "order close failed DetailOrderDetailHandler,OrderSn:%s, err: %v", in.OrderSn, err) + return err + } + // 更新状态、状态链路 + var ntime = time.Now().UTC() + var statusCode = constants.ORDER_STATUS_CLOSE + var statusLink = order.UpdateOrderStatusLink(ress.OrderDetailOriginal.StatusLink, gmodel.OrderStatus{ + Ctime: &ntime, + Utime: &ntime, + StatusCode: statusCode, + StatusTitle: constants.OrderStatusMessage[statusCode], + }) + statusLinkByte, err := json.Marshal(statusLink) + if err != nil { + logc.Errorf(ctx, "order close failed Marshal statusLinkByte,OrderSn:%s, err: %v", in.OrderSn, err) + return err + } + + var table = gmodel.NewAllModels(tx).FsOrder.TableName() + var resUpdate *gorm.DB + var resUpdateSql string = fmt.Sprintf("UPDATE %s SET `status` = %d , `utime` = '%s'", table, statusCode, ntime) + resUpdate = tx.Exec(fmt.Sprintf("%s ,`status_link`= JSON_MERGE_PATCH(`status_link`,?) WHERE `id` = %d", resUpdateSql, orderInfo.Id), statusLinkByte) + + err = resUpdate.Error + if err != nil { + logc.Errorf(ctx, "order close failed Update FsOrder,OrderSn:%s, err: %v", in.OrderSn, err) + return err + } + + // 新增购物车 + if in.Type == 1 { + var users []gmodel.FsShoppingCart + for _, shoppingCart := range ress.OrderDetailOriginal.ShoppingCartSnapshot { + users = append(users, gmodel.FsShoppingCart{ + UserId: shoppingCart.UserId, + ProductId: shoppingCart.ProductId, + TemplateId: shoppingCart.TemplateId, + ModelId: shoppingCart.ModelId, + SizeId: shoppingCart.SizeId, + LightId: shoppingCart.FittingId, + FittingId: shoppingCart.FittingId, + PurchaseQuantity: shoppingCart.PurchaseQuantity, + Snapshot: shoppingCart.Snapshot, + IsSelected: shoppingCart.IsSelected, + IsHighlyCustomized: shoppingCart.IsHighlyCustomized, + Ctime: &ntime, + Utime: &ntime, + }) + } + resCreate := tx.Create(&users) + err = resCreate.Error + if err != nil { + logc.Errorf(ctx, "order close failed Create FsShoppingCart,OrderSn:%s, err: %v", in.OrderSn, err) + return err + } + } + return nil + }) + if err != nil { + logc.Errorf(ctx, "order close failed, err: %v", err) + if errorCode.Code == 0 { + errorCode.Code = basic.CodeApiErr.Code + errorCode.Message = basic.CodeApiErr.Message + } + return &CloseRes{ + ErrorCode: errorCode, + }, err + } + + return &CloseRes{ + ErrorCode: errorCode, + }, err +} + // 支付成功 func (d *defaultOrder) PaymentSuccessful(ctx context.Context, in *PaymentSuccessfulReq) (res *PaymentSuccessfulRes, err error) { var orderSn string @@ -380,7 +537,7 @@ func (d *defaultOrder) PaymentSuccessful(ctx context.Context, in *PaymentSuccess }) // 更新数据库 - var table = gmodel.NewAllModels(d.MysqlConn).FsOrder.TableName() + var table = gmodel.NewAllModels(tx).FsOrder.TableName() var resUpdate *gorm.DB var resUpdateSql string if *orderInfo.Status == int64(constants.ORDER_STATUS_UNPAIDDEPOSIT) { @@ -388,7 +545,7 @@ func (d *defaultOrder) PaymentSuccessful(ctx context.Context, in *PaymentSuccess } else { resUpdateSql = fmt.Sprintf("UPDATE %s SET `pay_status` = %d , `utime` = '%s'", table, orderPayStatusCode, ntime) } - resUpdate = d.MysqlConn.Exec(fmt.Sprintf("%s ,`status_link`= JSON_MERGE_PATCH(`status_link`,?),`pay_status_link`= JSON_MERGE_PATCH(`pay_status_link`,?),`order_amount`= JSON_MERGE_PATCH(`order_amount`,?) WHERE `id` = %d", resUpdateSql, orderInfo.Id), statusLinkByte, payStatusLinkByte, orderAmountByte) + resUpdate = tx.Exec(fmt.Sprintf("%s ,`status_link`= JSON_MERGE_PATCH(`status_link`,?),`pay_status_link`= JSON_MERGE_PATCH(`pay_status_link`,?),`order_amount`= JSON_MERGE_PATCH(`order_amount`,?) WHERE `id` = %d", resUpdateSql, orderInfo.Id), statusLinkByte, payStatusLinkByte, orderAmountByte) err = resUpdate.Error if err != nil { diff --git a/utils/order/order.go b/utils/order/order.go index e28f0278..7543e92b 100644 --- a/utils/order/order.go +++ b/utils/order/order.go @@ -140,14 +140,19 @@ func GenerateOrderStatusLink(deliveryMethod int64, noTime time.Time, expectedTim orderStatus = constants.OrderStatusUserCLOUDSTORE } for _, v := range orderStatus { - list = append(list, gmodel.OrderStatus{ + item := gmodel.OrderStatus{ StatusCode: v, StatusTitle: constants.OrderStatusMessage[v], - }) + } + if v == constants.ORDER_STATUS_UNPAIDDEPOSIT { + item.Ctime = &noTime + item.Utime = &noTime + } + if v == constants.ORDER_STATUS_DIRECTMAIL_ARRIVED || v == constants.ORDER_STATUS_CLOUDSTORE_ARRIVEDWAREHOUSE { + item.ExpectedTime = &expectedTime + } + list = append(list, item) } - list[0].Ctime = &noTime - list[0].Utime = &noTime - list[len(list)-1].ExpectedTime = &expectedTime return list } diff --git a/utils/queue/delayMessage.go b/utils/queue/delayMessage.go new file mode 100644 index 00000000..b1c079e3 --- /dev/null +++ b/utils/queue/delayMessage.go @@ -0,0 +1,152 @@ +package queue + +import ( + "errors" + "fmt" + "time" +) + +// 延迟消息 +type DelayMessage struct { + //当前下标 + curIndex int + //环形槽 + slots [3600]map[string]*Task + //关闭 + closed chan bool + //任务关闭 + taskClose chan bool + //时间关闭 + timeClose chan bool + //启动时间 + startTime time.Time +} + +// 执行的任务函数 +type TaskFunc func(args ...interface{}) + +// 任务 +type Task struct { + //循环次数 + cycleNum int + //执行的函数 + exec TaskFunc + params []interface{} +} + +// 创建一个延迟消息 +func NewDelayMessage() *DelayMessage { + dm := &DelayMessage{ + curIndex: 0, + closed: make(chan bool), + taskClose: make(chan bool), + timeClose: make(chan bool), + startTime: time.Now(), + } + for i := 0; i < 3600; i++ { + dm.slots[i] = make(map[string]*Task) + } + return dm +} + +// 启动延迟消息 +func (dm *DelayMessage) Start() { + go dm.taskLoop() + go dm.timeLoop() + select { + case <-dm.closed: + { + dm.taskClose <- true + dm.timeClose <- true + break + } + } +} + +// 关闭延迟消息 +func (dm *DelayMessage) Close() { + dm.closed <- true +} + +// 处理每1秒的任务 +func (dm *DelayMessage) taskLoop() { + defer func() { + fmt.Println("taskLoop exit") + }() + for { + select { + case <-dm.taskClose: + { + return + } + default: + { + //取出当前的槽的任务 + tasks := dm.slots[dm.curIndex] + if len(tasks) > 0 { + //遍历任务,判断任务循环次数等于0,则运行任务 + //否则任务循环次数减1 + for k, v := range tasks { + if v.cycleNum == 0 { + go v.exec(v.params...) + //删除运行过的任务 + delete(tasks, k) + } else { + v.cycleNum-- + } + } + } + } + } + } +} + +// 处理每1秒移动下标 +func (dm *DelayMessage) timeLoop() { + defer func() { + fmt.Println("timeLoop exit") + }() + tick := time.NewTicker(time.Second) + for { + select { + case <-dm.timeClose: + { + return + } + case <-tick.C: + { + //fmt.Println(time.Now().Format("2006-01-02 15:04:05")) + //判断当前下标,如果等于3599则重置为0,否则加1 + if dm.curIndex == 3599 { + dm.curIndex = 0 + } else { + dm.curIndex++ + } + } + } + } +} + +// 添加任务 +func (dm *DelayMessage) AddTask(t time.Time, key string, exec TaskFunc, params []interface{}) error { + if dm.startTime.After(t) { + return errors.New("时间错误") + } + //当前时间与指定时间相差秒数 + subSecond := t.Unix() - dm.startTime.Unix() + //计算循环次数 + cycleNum := int(subSecond / 3600) + //计算任务所在的slots的下标 + ix := subSecond % 3600 + //把任务加入tasks中 + tasks := dm.slots[ix] + if _, ok := tasks[key]; ok { + return errors.New("该slots中已存在key为" + key + "的任务") + } + tasks[key] = &Task{ + cycleNum: cycleNum, + exec: exec, + params: params, + } + return nil +}