队列服务器
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"time"
"github.com/go-redis/redis/v8"
)
const (
RedisAddress = "localhost:6379"
Topic = "my_topic"
)
type Message struct {
Protocol string
Data string
}
func main() {
// 创建Redis客户端
rdb := redis.NewClient(&redis.Options{
Addr: RedisAddress,
})
// 创建订阅者
subscriber := NewSubscriber(rdb)
// 启动订阅协程
go func() {
err := subscriber.Subscribe(Topic, handleMessage)
if err != nil {
log.Fatal(err)
}
}()
// 等待信号中断
waitForInterrupt()
// 关闭订阅者连接
subscriber.Close()
}
// Subscriber 订阅者
type Subscriber struct {
rdb *redis.Client
pubsub *redis.PubSub
}
// NewSubscriber 创建新的订阅者
func NewSubscriber(rdb *redis.Client) *Subscriber {
return &Subscriber{
rdb: rdb,
pubsub: rdb.Subscribe(context.Background(), Topic),
}
}
// Subscribe 订阅指定主题
func (s *Subscriber) Subscribe(handler func(Message)) error {
ch := s.pubsub.Channel()
for msg := range ch {
message := parseMessage(msg.Payload)
handler(message)
}
return nil
}
// Close 关闭订阅者连接
func (s *Subscriber) Close() error {
return s.pubsub.Close()
}
// 处理消息
func handleMessage(message Message) {
fmt.Println("Received message:", message.Data)
switch message.Protocol {
case "payment":
// TODO: 处理付款回调消息的逻辑
fmt.Println("Handling payment callback...")
case "upgrade":
// TODO: 处理升级消息的逻辑
fmt.Println("Handling upgrade message...")
default:
// TODO: 处理其他类型的消息的逻辑
fmt.Println("Handling other type of message...")
}
}
// 解析消息
func parseMessage(payload string) Message {
// TODO: 解析消息内容,提取协议和数据字段
return Message{
Protocol: "payment", // 假设协议字段在消息中指定
Data: payload,
}
}
// 等待中断信号
func waitForInterrupt() {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
<-c
}