package main
import (
"context"
"log"
"time"
"github.com/sagoo-cloud/nexframe/queue"
_ "github.com/sagoo-cloud/nexframe/queue/redisqueue"
)
func main() {
// 获取 Redis 队列实例
redisQueue := queue.GetQueue("myRedisQueue", queue.DriverTypeRedis)
// 启动生产者
go producer(redisQueue)
// 启动消费者
go consumer(redisQueue)
// 运行一段时间后退出
time.Sleep(1 * time.Minute)
}
func producer(q queue.Queue) {
ctx := context.Background()
key := "myQueue"
for i := 0; i < 10; i++ {
message := fmt.Sprintf("消息 #%d", i)
ok, err := q.Enqueue(ctx, key, message)
if err != nil {
log.Printf("生产者: 入队失败: %v", err)
continue
}
if !ok {
log.Printf("生产者: 消息 '%s' 入队操作未成功执行", message)
continue
}
log.Printf("生产者: 消息 '%s' 已成功入队", message)
time.Sleep(1 * time.Second)
}
}
func consumer(q queue.Queue) {
ctx := context.Background()
key := "myQueue"
for {
message, _, token, dequeueCount, err := q.Dequeue(ctx, key)
if err != nil {
log.Printf("消费者: 出队失败: %v", err)
time.Sleep(1 * time.Second)
continue
}
if message == "" {
log.Println("消费者: 队列为空,等待新消息...")
time.Sleep(1 * time.Second)
continue
}
log.Printf("消费者: 收到消息: %s, 出队次数: %d", message, dequeueCount)
// 处理消息...
ok, err := q.AckMsg(ctx, key, token)
if err != nil {
log.Printf("消费者: 确认消息失败: %v", err)
continue
}
if !ok {
log.Println("消费者: 消息确认操作未成功执行")
continue
}
log.Println("消费者: 消息已成功确认")
}
}