摘要:或者使用 BLPOP BRPOP 来读取数据,不同之处是取数据时,如果没有数据会等待指定的时间,。下面我说一下使用 list 实现一个简单的消息队列的整体思路。

使用 go 语言基于 redis 写了一个简单的消息队列

源码地址

使用demo

redis的 list 非常的灵活,可以从左边或者右边添加元素,当然也以从任意一头读取数据

添加数据和获取数据的操作也是非常简单的

LPUSH 从左边插入数据

RPUSH 大右边插入数据

LPOP 从左边取出一个数据

RPOP 从右边取出一个数据

127.0.0.1:6379> LPUSH list1 a
(integer) 1
127.0.0.1:6379> RPUSH list1 b
(integer) 2
127.0.0.1:6379> LPOP list1
"a"
127.0.0.1:6379> RPOP list1
"b"

或者使用 BLPOP BRPOP 来读取数据,不同之处是取数据时,如果没有数据会等待指定的时间,

如果这期间有数据写入,则会读取并返回,没有数据则会返回空

在一个 窗口1 读取

127.0.0.1:6379> BLPOP list1 10
1) "list1"
2) "a"

在另一个 窗口2 写入

127.0.0.1:6379> RPUSH list1 a b c
(integer) 3

再开一个 窗口3 读取,第二次读取时, list 是空的,所以等待1秒后返回空。

127.0.0.1:6379> BRPOP list1 1
1) "list1"
2) "c"

127.0.0.1:6379> BRPOP list1 1
(nil)
(1.04s)

简单消息队列的实现

如果我们只从一边新增元素,向另一边取出元素,这就不是一个消息队列么。但我估计你会有一个疑问,在消费数据时,同一个消息会不会同时被多个 consumer 消费掉?

当然不会,因为redis是单线程的,在从 list 取数据时天然不会出现并发问题。但是这是一个简单的消息队列,消费不成功怎么处理还是需要我们自己写代码来实现的

下面我说一下使用 list 实现一个简单的消息队列的整体思路

comsumer的实现

consumer 主要做的就是从list里读取数据,使用 LPOP 或者 BLPOP 都可以,

这里做了一个开关 optionsUseBLopp 如果为 true 时会使用 BLPOP

type consumer struct {
	once            sync.Once
	redisCmd        redis.Cmdable
	ctx             context.Context
	topicName       string
	handler         Handler
	rateLimitPeriod time.Duration
	options         ConsumerOptions
	_               struct{}
}

type ConsumerOptions struct {
	RateLimitPeriod time.Duration
	UseBLPop        bool
}

看一下创建 consumer 的代码,最后面的 opts 参数是可选的配置

type Consumer = *consumer

func NewSimpleMQConsumer(ctx context.Context, redisCmd redis.Cmdable, topicName string, opts ...ConsumerOption) Consumer {
	consumer := &consumer{
		redisCmd:  redisCmd,
		ctx:       ctx,
		topicName: topicName,
	}
	for _, o := range opts {
		o(&consumer.options)
	}
	if consumer.options.RateLimitPeriod == 0 {
		consumer.options.RateLimitPeriod = time.Microsecond * 200
	}
	return consumer
}

读取数据后具体怎么进行处理调用者可以根据自己的业务逻辑进行相应处理

有一个小的 interface 调用者根据自己的逻辑去实现

type Handler interface {
	HandleMessage(msg *Message)
}

读取数据的逻辑使用一个gorouting实现

func (s *consumer) startGetMessage() {
	go func() {
		ticker := time.NewTicker(s.options.RateLimitPeriod)
		defer func() {
			log.Println("stop get message.")
			ticker.Stop()
		}()
		for {
			select {
			case <-s.ctx.Done():
				log.Printf("context Done msg: %#v \n", s.ctx.Err())
				return
			case <-ticker.C:
				var revBody []byte
				var err error
				if !s.options.UseBLPop {
					revBody, err = s.redisCmd.LPop(s.topicName).Bytes()
				} else {
					revs := s.redisCmd.BLPop(time.Second, s.topicName)
					err = revs.Err()
					revValues := revs.Val()
					if len(revValues) >= 2 {
						revBody = []byte(revValues[1])
					}
				}
				if err == redis.Nil {
					continue
				}
				if err != nil {
					log.Printf("LPOP error: %#v \n", err)
					continue
				}

				if len(revBody) == 0 {
					continue
				}
				msg := &Message{}
				json.Unmarshal(revBody, msg)
				if s.handler != nil {
					s.handler.HandleMessage(msg)
				}
			}
		}
	}()
}

Producer 的实现

Producer 还是很简单的就是把数据推送到 reids

type Producer struct {
	redisCmd redis.Cmdable
	_        struct{}
}

func NewProducer(cmd redis.Cmdable) *Producer {
	return &Producer{redisCmd: cmd}
}

func (p *Producer) Publish(topicName string, body []byte) error {
	msg := NewMessage("", body)
	sendData, _ := json.Marshal(msg)
	return p.redisCmd.RPush(topicName, string(sendData)).Err()
}
相关文章