Skip to content

Commit

Permalink
feat: 增加了购买前发送购买消息到消息队列中
Browse files Browse the repository at this point in the history
  • Loading branch information
universero committed Nov 10, 2024
1 parent 540269f commit a088b2d
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 2 deletions.
12 changes: 10 additions & 2 deletions biz/application/service/charge.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/xh-polaris/openapi-core-api/biz/application/dto/openapi/charge"
"github.com/xh-polaris/openapi-core-api/biz/application/dto/openapi/core_api"
"github.com/xh-polaris/openapi-core-api/biz/infrastructure/consts"
"github.com/xh-polaris/openapi-core-api/biz/infrastructure/mq"
"github.com/xh-polaris/openapi-core-api/biz/infrastructure/rpc/openapi_charge"
"github.com/xh-polaris/openapi-core-api/biz/infrastructure/rpc/openapi_user"
"github.com/xh-polaris/openapi-core-api/biz/infrastructure/util"
Expand Down Expand Up @@ -147,7 +148,9 @@ func (s *ChargeService) BuyFullInterface(ctx context.Context, req *core_api.BuyF

// 计算总额
var amount int64
var rate int64
amount = 0
rate = 100
amount = increment * fullInf.Price

// 判断是否折扣
Expand All @@ -167,8 +170,7 @@ func (s *ChargeService) BuyFullInterface(ctx context.Context, req *core_api.BuyF
if gradients.Status != 0 {
return util.FailResponse(nil, "折扣暂不可用,购买失败"), nil
}
var rate int64
rate = 100

for _, discount := range gradients.Discounts {
if increment > discount.Low {
rate = discount.Rate
Expand All @@ -179,6 +181,12 @@ func (s *ChargeService) BuyFullInterface(ctx context.Context, req *core_api.BuyF

txId := primitive.NewObjectID().Hex() // 事务id

// 给消息队列发送对账消息
err := mq.SendBuyMsg(txId, -1*amount, rate, increment, fullInf.Price)
if err != nil {
return util.FailResponse(nil, "消息发送失败"), err
}

// 扣除用户余额
remainResp, err := s.UserClient.SetRemain(ctx, &genuser.SetRemainReq{
UserId: userId,
Expand Down
5 changes: 5 additions & 0 deletions biz/infrastructure/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,17 @@ type Auth struct {
AccessExpire int64
}

type RocketMQ struct {
NameServers []string
}

type Config struct {
service.ServiceConf
ListenOn string
TimeThreshold int64
Auth Auth
Action map[string]string
RocketMQ RocketMQ
}

func NewConfig() (*Config, error) {
Expand Down
5 changes: 5 additions & 0 deletions biz/infrastructure/consts/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,8 @@ var FilteredHeaders = map[string]bool{
"Content-Length": true,
"X-Xh-Env": true,
}

// mq
const (
BuyTopic = "openapi_buy"
)
59 changes: 59 additions & 0 deletions biz/infrastructure/mq/buy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package mq

import (
"context"
"github.com/apache/rocketmq-client-go/v2"
mqprimitive "github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
"github.com/cloudwego/hertz/pkg/common/json"
logx "github.com/xh-polaris/gopkg/util/log"
"github.com/xh-polaris/openapi-core-api/biz/infrastructure/config"
"github.com/xh-polaris/openapi-core-api/biz/infrastructure/consts"
)

func SendBuyMsg(txId string, amount int64, rate int64, increment int64, price int64) error {
// 创建一个Producer实例
p, err := rocketmq.NewProducer(
producer.WithNsResolver(mqprimitive.NewPassthroughResolver(config.GetConfig().RocketMQ.NameServers)),
producer.WithRetry(2),
)
if err != nil {
return err
}

err = p.Start()
if err != nil {
return err
}
defer func() {
err2 := p.Shutdown()
if err2 != nil {
logx.Error("producer关闭失败")
}
}()

// 定义消息内容
var msgBody map[string]interface{}
msgBody["txId"] = txId // 事务id
msgBody["rate"] = rate // 折扣
msgBody["increment"] = increment // 接口余量增加量
msgBody["amount"] = amount // 用户余额扣减
msgBody["price"] = price // 购买时单价

msgBodyBytes, err := json.Marshal(msgBody)
if err != nil {
return err
}

msg := &mqprimitive.Message{
Topic: consts.BuyTopic,
Body: msgBodyBytes,
}

_, err = p.SendSync(context.Background(), msg)
if err != nil {
return err
}

return nil
}
12 changes: 12 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ require (

require (
github.com/andeya/goutil v1.0.0 // indirect
github.com/apache/rocketmq-client-go v1.2.4 // indirect
github.com/apache/rocketmq-client-go/v2 v2.1.2 // indirect
github.com/apache/thrift v0.13.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bufbuild/protocompile v0.10.0 // indirect
Expand All @@ -44,13 +46,16 @@ require (
github.com/cloudwego/runtimex v0.1.0 // indirect
github.com/cloudwego/thriftgo v0.3.17 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/emirpasic/gods v1.12.0 // indirect
github.com/fatih/color v1.17.0 // indirect
github.com/fatih/structtag v1.2.0 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/mock v1.6.0 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/pprof v0.0.0-20240727154555-813a5fbdbec8 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 // indirect
github.com/henrylee2cn/ameda v1.5.1 // indirect
github.com/iancoleman/strcase v0.2.0 // indirect
Expand All @@ -60,6 +65,7 @@ require (
github.com/kitex-contrib/obs-opentelemetry v0.2.3 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/klauspost/cpuid/v2 v2.2.4 // indirect
github.com/konsorten/go-windows-terminal-sequences v1.0.3 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
Expand All @@ -68,12 +74,15 @@ require (
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/nyaruka/phonenumbers v1.1.6 // indirect
github.com/openzipkin/zipkin-go v0.4.3 // indirect
github.com/patrickmn/go-cache v2.1.0+incompatible // indirect
github.com/pelletier/go-toml/v2 v2.2.2 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.20.2 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.55.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
github.com/sirupsen/logrus v1.6.0 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/stretchr/testify v1.9.0 // indirect
github.com/tidwall/gjson v1.17.3 // indirect
Expand All @@ -90,6 +99,7 @@ require (
go.opentelemetry.io/otel/sdk v1.24.0 // indirect
go.opentelemetry.io/otel/trace v1.24.0 // indirect
go.opentelemetry.io/proto/otlp v1.3.1 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/automaxprocs v1.5.3 // indirect
golang.org/x/arch v0.2.0 // indirect
golang.org/x/net v0.28.0 // indirect
Expand All @@ -98,6 +108,8 @@ require (
golang.org/x/text v0.17.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240711142825-46eb208f015d // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
stathat.com/c/consistent v1.0.0 // indirect
)
Loading

0 comments on commit a088b2d

Please sign in to comment.