原项目Github 链接 :https://github.com/Shopify/sarama
sarama是一个go语言实现的kafka客户端,是官方推荐的kafka客户端之一
特点是简单,易用,很容易上手。再加上纯go语言开发,对于我们理解、排查问题以及魔改都比较方便。
目前github 8.2k stars
本文基于 sarama v1.31.1
个人注释版:https://github.com/freephenix/sarama
后续的更新会提交到这里,文章可能更新延迟
消费者
下面开始阅读源码学习,首先,我希望能够带着问题去寻找答案,这样更加容易找到一条线,避免阅读时迷失方向。
也给大家建议在阅读不熟悉的项目时找准一条线,从浅入深,一层层去看,就是常听到的剥洋葱式学习方式,我认为这样的本意也是避免迷失,能更高效的理解系统。
先提几个问题
首先,我知道消费者分两种,一种是消费者组,一种是单体的消费者。对于我来说,消费者组更常用,所以先从它开始。
在消费者组工作过程中,我根据自己的经验,可以想象到,它主要有四个工作
• 如何消费消息
• 如何处理rebalance,增减负责消费的partition
• 如何处理异常
• 如何处理关闭信号
还有就是隐含在这四个工作之下的一些问题
• 如何和服务端通信
• 如何控制消费者组的启停
• 起停时都做了哪些工作
• ...
所以下面在阅读时候,我会着重寻找这几个问题的答案
主要数据结构
文件位置 ./consumer_group.go
// 消费者组对外暴露的interface,new方法中返回的就是这个
type ConsumerGroup interface {
// 需要传入给定的topic组,还有消费这些消息的接口实现ConsumerGroupHandler
// ConsumerGroupHandler 下面会讲到,暂时先看消费者组session内的生命周期步骤
// 1. 加入此消费者组,服务端会公平的分配partition到当前消费者
// 2. 程序启动前,会调用handler中的Setup()方法,可以通知用户和进行准备工作
// 3. 任何消息都会调用handler的ConsumeClaim()方法。
// 这个方法在每个独立的goroutine中需要是线程安全的,消费状态需要小心的维护
// 4. 消费者退出有三种情况handler的ConsumeClaim()退出;
// 服务端发起了一次rebalance;父级context触发了cancel()
// 5. 当所有ConsumeClaim()退出时,handler的Cleanup()钩子函数会被调用,
// 用以执行rebalance前的收尾工作
// 6. 消费完成时,标记并提交offset
// 有几点需要注意, 一旦rebalance触发,那么消息处理函数ConsumeClaim()必须在配置项
// Config.Consumer.Group.Rebalance.Timeout规定的时间内退出,而且需要给offset提交
// 和清理函数Cleanup()留出时间. 如果超时未完成,那么这个消费者会被kafka服务端从这个组中除名,
// 导致消息offset提交失败。
// 这个方法应该被放在无限循环中,因为当rebalance发生时,这个函数会退出,所以需要再次调用启动
Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error
// Errors 接受sarama错误信息的channel,但是默认情况下,错误会直接打印日志而非发送到这里,
// 如果需要的话,可以将Consumer.Return.Errors设置为true,然后就可以监听这个channel了
Errors() <-chan error
// Close 停止consumerGroup并将所有session分离出去.
// 为防止内存泄露,要在对象逃逸之前调用这个函数
Close() error
}
type consumerGroup struct {
// 通用kafka client 管理连接,必须close防止逃逸
client Client
// 配置项很多,暂不展开
config *Config
// 管理真正的从broker消费消息的PartitionConsumer,必须close防止逃逸
consumer Consumer
// 组名
groupID string
// ???
memberID string
// 接收错误的channel
errors chan error
lock sync.Mutex
// 接收结束信号的channel
closed chan none
// 保证只close一次
closeOnce sync.Once
// ???
userData []byte
}
/*******************************************************************/
// ConsumerGroupSession 代表了一个consumerGroup的用户生命周期
type ConsumerGroupSession interface {
// Claims 返回topic到被分配的partition数组的map
Claims() map[string][]int32
// MemberID 返回集群中的memberID
MemberID() string
// GenerationID 返回当前的GenerationID,这个字段类似选举的group Leader的版本号
GenerationID() int32
// MarkOffset 标记offset,同时在metadata中记录一个字符串,用来标识当时partition
// consumer的状态,其他的consumer可以根据这个字符串恢复消费
//
// 按照上传约定, 调用这个方法时,你是在标记预期要读取的消息,而不是刚读取到的消息。
// 也正因此,调用这个方法时,你需要传入的参数是上一个消息的offset+1
//
// 注意:因为效率的原因,调用 MarkOffset时并不会立即执行消息的提交,所以如果程序崩溃时,有可能一些消息
// 并没有提交成功。也就是说你的程序有可能两次接受同样的一个消息,所以需要你的程序保证幂等性。
MarkOffset(topic string, partition int32, offset int64, metadata string)
// Commit 就是我们熟知的消息提交了
// 注意: 调用Commit是一个同步阻塞的操作
Commit()
// ResetOffset 根据提供的值重置offset,同时在metadata中记录一个字符串,用来标识当时partition
// consumer的状态。重置操作对应 MarkOffset,不同之处在于,它可以传入一个比当前offset小的值,而
// MarkOffset只允许传入比当前offset大的值,详情请查阅 MarkOffset。
//(cf. 是一个拉丁语衍生的(也是现代英语)词汇confer的简写,表示“比较”或“查阅”的意思。 主要用于普通法和成文法文本中,还有学术著作。)
ResetOffset(topic string, partition int32, offset int64, metadata string)
// MarkMessage 标记一个消息已经消费了。
MarkMessage(msg *ConsumerMessage, metadata string)
// Context 返回session中的context
Context() context.Context
}
type consumerGroupSession struct {
// 父级对象 consumerGroup
parent *consumerGroup
memberID string
generationID int32
handler ConsumerGroupHandler
claims map[string][]int32
// 处理offset相关功能,如提交、POM写入、POM解析
offsets *offsetManager
ctx context.Context
cancel func()
waitGroup sync.WaitGroup
// 保证资源清理只做一次
releaseOnce sync.Once
// 心跳状态通知channel
hbDying, hbDead chan none
}
/*******************************************************************/
// ConsumerGroupHandler 处理独立的topic或者分区的消息。同时也提供了钩子方法,允许你在session生命周期
// 的开始和结束时嵌入逻辑
// 请注意,这个handler会在多个不同的goroutine中调用,请确保状态是并发安全的,并且不存在数据竞争
type ConsumerGroupHandler interface {
// Setup 在session的开始阶段运行, 早于 ConsumeClaim.
Setup(ConsumerGroupSession) error
// Cleanup 在session的结束阶段运行, 在所有 ConsumeClaim 的goroutines 退出之后、但在最后
// 一次offset提交之前,调用且只调用一次
Cleanup(ConsumerGroupSession) error
// ConsumeClaim 必须起一个ConsumerGroupClaim的Messages()的循环,一旦Messages()关闭时,
// Handler 必须结束循环并退出
ConsumeClaim(ConsumerGroupSession, ConsumerGroupClaim) error
}
// ConsumerGroupClaim 在一个consumerGroup之内,处理一个topic和partition中的消息.
type ConsumerGroupClaim interface {
// Topic 返回消费的topic
Topic() string
// Partition 返回消费的partition
Partition() int32
// InitialOffset 返回开始消费时初始化的offset
InitialOffset() int64
// HighWaterMarkOffset 返回服务端产生的下一条消息的offset(并不是客户端下一条接受的)。
// 你可以根据这个判断自己的处理程序落后了多少
//(i.e. 换句话说)
HighWaterMarkOffset() int64
// Messages 返回从broker读消息的channel。这个channel当新的rebalance触发时会关闭. 你必须在
// Config.Consumer.Group.Session.Timeout 约定的时间内完成退出和标记offsets,这样才能让
// topic/partition重新分配成为消费组中的新成员
Messages() <-chan *ConsumerMessage
}
type consumerGroupClaim struct {
topic string
partition int32
offset int64
PartitionConsumer
}
以上是consumerGroup.go中三个接口类型和相应的几个结构体
通过对这些注释进行分析结合我们自己的使用,我们可以得出以下一些结论
1、consumerGroup是最外层的结构体,负责消费者组的起停
(1)其结构体内有负责管理连接的client
(2)实际消费用的是consumer结构体,真实消费消息的是consumer内的partitionConsumer
2、ConsumerGroupHandler是消息处理者的接口,用于对接业务逻辑和消费相关功能对接,还提供了启动开始和结束时两个钩子函数
3、ConsumerGroupSession是代表消费者组生命周期的结构,提供了一些列获取、控制消费者组状态信息的方法
4、ConsumerGroupClaim是消息的出口结构,主要功能就是用于消息的消费,提供消息的内容
消费者组启动
如何启动其实在对于结构体的注释阅读已经明白了,那我们主要要看的就是正常使用的启动时,那些方法都做了哪些事情
首先,是初始化
/* consumerGroup.go */
// NewConsumerGroup 主要工作是启动一个client,client的主要职责就是跟broker建tcp连接,保持更新元数据
func NewConsumerGroup(addrs []string, groupID string, config *Config) (ConsumerGroup, error) {
// 起客户端,与broker保持通信,主要维护元数据
client, err := NewClient(addrs, config)
if err != nil {
return nil, err
}
// 起消费者组,主要就是把client组装进去
c, err := newConsumerGroup(groupID, client)
if err != nil {
_ = client.Close()
}
return c, err
}
// ...
func newConsumerGroup(groupID string, client Client) (ConsumerGroup, error) {
config := client.Config()
if !config.Version.IsAtLeast(V0_10_2_0) {
return nil, ConfigurationError("consumer groups require Version to be >= V0_10_2_0")
}
consumer, err := NewConsumerFromClient(client)
if err != nil {
return nil, err
}
return &consumerGroup{
client: client,
consumer: consumer,
config: config,
groupID: groupID,
errors: make(chan error, config.ChannelBufferSize),
closed: make(chan none),
}, nil
}
/* client.go */
// NewClient 创建一个新的客户端,会随机选一个broker地址,自动拉取元数据信息,如果所有broker都拿不到地址,那客户端创建失败
func NewClient(addrs []string, conf *Config) (Client, error) {
Logger.Println("Initializing new client")
if conf == nil {
conf = NewConfig()
}
if err := conf.Validate(); err != nil {
return nil, err
}
if len(addrs) < 1 {
return nil, ConfigurationError("You must provide at least one broker address")
}
client := &client{
conf: conf,
closer: make(chan none),
closed: make(chan none),
brokers: make(map[int32]*Broker),
metadata: make(map[string]map[int32]*PartitionMetadata),
metadataTopics: make(map[string]none),
cachedPartitionsResults: make(map[string][maxPartitionIndex][]int32),
coordinators: make(map[string]int32),
}
// 多个broker时,打乱顺序
client.randomizeSeedBrokers(addrs)
// 默认为true,会多占些空间,不过会提前加载好元数据信息
// 为false时,则只会在下方开的goroutine里定时刷新元数据
if conf.Metadata.Full {
// 刷新元数据,建立tcp连接,通信拿取元数据
err := client.RefreshMetadata()
switch err {
case nil:
break
case ErrLeaderNotAvailable, ErrReplicaNotAvailable, ErrTopicAuthorizationFailed, ErrClusterAuthorizationFailed:
Logger.Println(err)
default:
close(client.closed) // we haven't started the background updater yet, so we have to do this manually
_ = client.Close()
return nil, err
}
}
// 起定时器,定期更新元数据
go withRecover(client.backgroundMetadataUpdater)
Logger.Println("Successfully initialized new client")
return client, nil
}
/* broker.go */
// 在 client.RefreshMetadata() 中会有与broker建连的过程,这里简单提下
// Open 并发方式尝试去创建一个broker连接,如果已经连接过则会报错,并不会阻塞在这里,但是任何后续的broker操作都会阻塞等待连接成功或失败
// 想要获取返回结果,可以调用函数 Connected(),唯一直接返回的错误是配置项校验。没有配置项写入时会使用 NewConfig()的默认配置。
func (b *Broker) Open(conf *Config) error {
if !atomic.CompareAndSwapInt32(&b.opened, 0, 1) {
return ErrAlreadyConnected
}
if conf == nil {
conf = NewConfig()
}
err := conf.Validate()
if err != nil {
return err
}
b.lock.Lock()
go withRecover(func() {
defer b.lock.Unlock()
dialer := conf.getDialer()
// 建立一个tcp连接,尝试拨号
b.conn, b.connErr = dialer.Dial("tcp", b.addr)
if b.connErr != nil {
Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, b.connErr)
b.conn = nil
atomic.StoreInt32(&b.opened, 0)
return
}
if conf.Net.TLS.Enable {
// 包装为传输层安全的客户端
b.conn = tls.Client(b.conn, validServerNameTLS(b.addr, conf.Net.TLS.Config))
}
b.conn = newBufConn(b.conn)
b.conf = conf
// ...
b.done = make(chan bool)
b.responses = make(chan responsePromise, b.conf.Net.MaxOpenRequests-1)
if b.id >= 0 {
Logger.Printf("Connected to broker at %s (registered as #%d)\n", b.addr, b.id)
} else {
Logger.Printf("Connected to broker at %s (unregistered)\n", b.addr)
}
// 监听返回内容,这里是每个broker中维护的处理response的地方
go withRecover(b.responseReceiver)
})
return nil
}
func (b *Broker) responseReceiver() {
var dead error
// 无尽循环读取内容,直到Close()中b.responses被关闭
for response := range b.responses {
// 根据内容本身,选择分发到response.errors 或者 response.packets
if dead != nil {
// This was previously incremented in send() and
// we are not calling updateIncomingCommunicationMetrics()
b.addRequestInFlightMetrics(-1)
response.errors <- dead
continue
}
headerLength := getHeaderLength(response.headerVersion)
header := make([]byte, headerLength)
bytesReadHeader, err := b.readFull(header)
requestLatency := time.Since(response.requestTime)
if err != nil {
b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
dead = err
response.errors <- err
continue
}
decodedHeader := responseHeader{}
err = versionedDecode(header, &decodedHeader, response.headerVersion)
if err != nil {
b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
dead = err
response.errors <- err
continue
}
// correlationID 是发送时写入进去的(自增)关联ID,可以代表发送的顺序
if decodedHeader.correlationID != response.correlationID {
b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
// TODO if decoded ID < cur ID, discard until we catch up
// TODO if decoded ID > cur ID, save it so when cur ID catches up we have a response
dead = PacketDecodingError{fmt.Sprintf("correlation ID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID)}
response.errors <- dead
continue
}
buf := make([]byte, decodedHeader.length-int32(headerLength)+4)
bytesReadBody, err := b.readFull(buf)
b.updateIncomingCommunicationMetrics(bytesReadHeader+bytesReadBody, requestLatency)
if err != nil {
dead = err
response.errors <- err
continue
}
response.packets <- buf
}
close(b.done)
}
然后,是consumerGroup.Consume()启动消费
/* consumerGroup.go */
// Consume implements ConsumerGroup.
func (c *consumerGroup) Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error {
// Ensure group is not closed
// fail fast
select {
case <-c.closed:
return ErrClosedConsumerGroup
default:
}
c.lock.Lock()
defer c.lock.Unlock()
// Quick exit when no topics are provided
if len(topics) == 0 {
return fmt.Errorf("no topics provided")
}
// Refresh metadata for requested topics
// 刷新下元数据
if err := c.client.RefreshMetadata(topics...); err != nil {
return err
}
// Init session
// 初始化 consumerGroupSession,启动真正的消费
sess, err := c.newSession(ctx, topics, handler, c.config.Consumer.Group.Rebalance.Retry.Max)
if err == ErrClosedClient {
return ErrClosedConsumerGroup
} else if err != nil {
return err
}
// 无限循环检查分区数是否变化,决定是否rebalance,这里有漏洞,如果有起有停,那就不会触发
go c.loopCheckPartitionNumbers(topics, sess)
// Wait for session exit signal
<-sess.ctx.Done()
// Gracefully release session claims
return sess.release(true)
}
func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int) (*consumerGroupSession, error) {
// 选定一个broker,同步元数据、进行各种请求之类的
coordinator, err := c.client.Coordinator(c.groupID)
if err != nil {
if retries <= 0 {
return nil, err
}
// 重试机制,次数默认4
return c.retryNewSession(ctx, topics, handler, retries, true)
}
// Join consumer group
// 发送加入组中的请求
join, err := c.joinGroupRequest(coordinator, topics)
if err != nil {
_ = coordinator.Close()
return nil, err
}
switch join.Err {
//...
}
// memberID 看来是发送加组请求后分配的唯一标识
var plan BalanceStrategyPlan
// 如果是leader则需要根据分配策略算出一个分配计划
if join.LeaderId == join.MemberId {
members, err := join.GetMembers()
if err != nil {
return nil, err
}
// 按配置中的策略分配,默认 BalanceStrategyRange
plan, err = c.balance(members)
if err != nil {
return nil, err
}
}
// 看来类似ack,只是如果是leader的话,还需要将分配完的计划返回
groupRequest, err := c.syncGroupRequest(coordinator, plan, join.GenerationId)
if err != nil {
_ = coordinator.Close()
return nil, err
}
switch groupRequest.Err {
//...
}
// Retrieve and sort claims
var claims map[string][]int32
if len(groupRequest.MemberAssignment) > 0 {
members, err := groupRequest.GetMemberAssignment()
if err != nil {
return nil, err
}
claims = members.Topics
c.userData = members.UserData
for _, partitions := range claims {
sort.Sort(int32Slice(partitions))
}
}
return newConsumerGroupSession(ctx, c, claims, join.MemberId, join.GenerationId, handler)
}
func newConsumerGroupSession(ctx context.Context, parent *consumerGroup, claims map[string][]int32, memberID string, generationID int32, handler ConsumerGroupHandler) (*consumerGroupSession, error) {
// init offset manager
offsets, err := newOffsetManagerFromClient(parent.groupID, memberID, generationID, parent.client)
if err != nil {
return nil, err
}
// init context
ctx, cancel := context.WithCancel(ctx)
// init session
sess := &consumerGroupSession{
parent: parent,
memberID: memberID,
generationID: generationID,
handler: handler,
offsets: offsets,
claims: claims,
ctx: ctx,
cancel: cancel,
hbDying: make(chan none),
hbDead: make(chan none),
}
// start heartbeat loop
// 开始心跳,检测状态、处理错误,内有重试机制
go sess.heartbeatLoop()
// create a POM for each claim
// 发送请求,获取初始offset等信息
for topic, partitions := range claims {
for _, partition := range partitions {
pom, err := offsets.ManagePartition(topic, partition)
if err != nil {
_ = sess.release(false)
return nil, err
}
// handle POM errors
go func(topic string, partition int32) {
for err := range pom.Errors() {
sess.parent.handleError(err, topic, partition)
}
}(topic, partition)
}
}
// perform setup
// 调用 ConsumerGroupHandler 的 Setup() 开始消费前的钩子
if err := handler.Setup(sess); err != nil {
// 无处不在的有问题就释放,可以用defer吧?
_ = sess.release(true)
return nil, err
}
// start consuming
for topic, partitions := range claims {
for _, partition := range partitions {
sess.waitGroup.Add(1)
go func(topic string, partition int32) {
defer sess.waitGroup.Done()
// cancel the as session as soon as the first
// goroutine exits
defer sess.cancel()
// consume a single topic/partition, blocking
sess.consume(topic, partition)
}(topic, partition)
}
}
return sess, nil
}
func (s *consumerGroupSession) consume(topic string, partition int32) {
// quick exit if rebalance is due
select {
case <-s.ctx.Done():
return
case <-s.parent.closed:
return
default:
}
// get next offset
offset := s.parent.config.Consumer.Offsets.Initial
if pom := s.offsets.findPOM(topic, partition); pom != nil {
offset, _ = pom.NextOffset()
}
// create new claim
// 带着初始化的offset创建新的claim,起一个分区的消费者,
// 真正的消费者是建立在topic下的partition上的,partitionConsumer
claim, err := newConsumerGroupClaim(s, topic, partition, offset)
if err != nil {
s.parent.handleError(err, topic, partition)
return
}
// handle errors
go func() {
for err := range claim.Errors() {
s.parent.handleError(err, topic, partition)
}
}()
// trigger close when session is done
// 再次判断是否close
go func() {
select {
case <-s.ctx.Done():
case <-s.parent.closed:
}
claim.AsyncClose()
}()
// start processing
// 执行 consumerGroupHandler 的 ConsumeClaim(), 开始执行业务消费
if err := s.handler.ConsumeClaim(s, claim); err != nil {
s.parent.handleError(err, topic, partition)
}
// ensure consumer is closed & drained
claim.AsyncClose()
for _, err := range claim.waitClosed() {
s.parent.handleError(err, topic, partition)
}
}
消费者释放
close就暂且不说了,主要就是资源的释放,别忘了最终offset的提交
注意defer的应用,可以放一些默认返回等等,还有就是小心close时遇到panic,可以加上recover
消费者消费中
这里主要是看一些commit、markOffset等操作
/* consumerGroup.go */
// 并没有真的立即执行操作,只是将信息写到了POM中
func (s *consumerGroupSession) MarkOffset(topic string, partition int32, offset int64, metadata string) {
if pom := s.offsets.findPOM(topic, partition); pom != nil {
pom.MarkOffset(offset, metadata)
}
}
func (s *consumerGroupSession) Commit() {
s.offsets.Commit()
}
// 并没有真的立即执行操作,只是将信息写到了POM中
func (s *consumerGroupSession) ResetOffset(topic string, partition int32, offset int64, metadata string) {
if pom := s.offsets.findPOM(topic, partition); pom != nil {
pom.ResetOffset(offset, metadata)
}
}
/* offsetManager.go */
func (om *offsetManager) Commit() {
om.flushToBroker()
om.releasePOMs(false)
}
// 刷新offset到broker中
func (om *offsetManager) flushToBroker() {
req := om.constructRequest()
if req == nil {
return
}
// 选取进行沟通的broker,注意点是尽可能减小锁的粒度,但同时不要忽略可用性校验
broker, err := om.coordinator()
if err != nil {
om.handleError(err)
return
}
resp, err := broker.CommitOffset(req)
if err != nil {
om.handleError(err)
om.releaseCoordinator(broker)
_ = broker.Close()
return
}
// 大量使用如此的异步消费方式,减少时间等待,值得借鉴
om.handleResponse(broker, req, resp)
}
// Releases/removes closed POMs once they are clean (or when forced)
func (om *offsetManager) releasePOMs(force bool) (remaining int) {
om.pomsLock.Lock()
defer om.pomsLock.Unlock()
for topic, topicManagers := range om.poms {
for partition, pom := range topicManagers {
pom.lock.Lock()
releaseDue := pom.done && (force || !pom.dirty)
pom.lock.Unlock()
if releaseDue {
pom.release()
delete(om.poms[topic], partition)
if len(om.poms[topic]) == 0 {
delete(om.poms, topic)
}
}
}
remaining += len(om.poms[topic])
}
return
}
整理
再回头来看这些问题
• 如何消费消息
底层使用partitionConsumer,单起goroutine进行消息消费
• 如何处理rebalance,增减负责消费的partition
根据元数据中的分区数变化判断时候需要rebalance,直接重新进行consume,以应用变更
• 如何处理异常
多数采用异步读取errChan的方式获取错误信息并做相应处理
• 如何处理关闭信号
关闭时,使用ctx串联内部,以及close chennel,判断是否需要开始释放资源
• 如何和服务端通信
使用tcp方式通信,response使用channel监听,异步拿结果
• 如何控制消费者组的启停
使用ctx和错误channel控制
• 起停时都做了哪些工作
起:获取(刷新)元数据,leader还需要进行分配,初始化POM,执行钩子,开始消费
停:释放资源、退出消费者goroutine、执行钩子、提交未提交的offset、清理POM
剩下还有一小部分了,不过大体上与使用者比较能接触上的主要就是这些,深层机制可以后续再谈
下面看看发送者
发送者
先提几个问题
惯例,先提问题,再看代码
• 如何发送,是否还是依然partition为粒度?
• 如何控制阻塞,跟非阻塞发送者有什么区别?
• 释放资源时都做了什么?
• 如何处理错误(返回值),是否还是channel异步获取?
因为已经看了消费者的代码,可能很多问题已经有了预设,可以看下还有没有更多的细节
主要数据结构
因工作中主要使用同步发送者,这里主要看同步消费者,其实底层也是用的异步消费者,可谓一举两得
/* sync_producer.go*/
// SyncProducer 用于发送kafka消息,在收到确认回复前会一直阻塞。它负责将消息发送到正确的broker,
// 适当刷新元数据,并且会解析错误返回。当逃逸时,它并不一定会被垃圾回收自动清理掉,所以为了防止资源泄露,你需要调用 Close()。
// 使用 SyncProducer 有两个注意点:它通常比 AsyncProducer 效率低,并且在确认消息时提供的实际持久性保证取决于
// Producer.RequiredAcks 的配置值,在某些配置中,由 SyncProducer 确认的消息有时仍会丢失。
// 因为实现的原因, SyncProducer 需要`Producer.Return.Errors` and `Producer.Return.Successes`两个配置项设置为true
type SyncProducer interface {
// SendMessage 发送消息接口
SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error)
// SendMessage 批量发送消息接口,有一个错误就会返回error
SendMessages(msgs []*ProducerMessage) error
// Close 清理资源接口,记得一定要调用啊,否则会造成资源泄露
Close() error
}
// 一个异步发送者,一个waitGroup
type syncProducer struct {
producer *asyncProducer
wg sync.WaitGroup
}
发送者启动
发送者的机制还是很值得借鉴学习的,一个是层层传递,入口很单一,但是因为逻辑复杂度,需要从topic发送者分化成为多个partition发送者,又因为希望出口单一,并提高效率,所以又收拢为broker发送者,定时&定量阈值双开启的机制去发送,兼顾效率和复杂度,很值得学习
/* sync_producer.go*/
// NewSyncProducer 创建一个新的同步发送者
func NewSyncProducer(addrs []string, config *Config) (SyncProducer, error) {
//...
// 其实还是起一个异步发送者
p, err := NewAsyncProducer(addrs, config)
if err != nil {
return nil, err
}
// 异步接受成功和错误信息,应该是通过阻塞返回值实现的了
return newSyncProducerFromAsyncProducer(p.(*asyncProducer)), nil
}
// verifyProducerConfig 同步发送者 `config.Producer.Return.Errors` 和 `config.Producer.Return.Successes`
// 两个配置必须是要为true
func verifyProducerConfig(config *Config) error {
if !config.Producer.Return.Errors {
return ConfigurationError("Producer.Return.Errors must be true to be used in a SyncProducer")
}
if !config.Producer.Return.Successes {
return ConfigurationError("Producer.Return.Successes must be true to be used in a SyncProducer")
}
return nil
}
/* async_producer.go*/
// NewAsyncProducer creates a new AsyncProducer using the given broker addresses and configuration.
func NewAsyncProducer(addrs []string, conf *Config) (AsyncProducer, error) {
// 依然是需要创建一个client,刷新元数据等
client, err := NewClient(addrs, conf)
if err != nil {
return nil, err
}
return newAsyncProducer(client)
}
func newAsyncProducer(client Client) (AsyncProducer, error) {
// Check that we are not dealing with a closed Client before processing any other arguments
if client.Closed() {
return nil, ErrClosedClient
}
// 启动一个事务管理,保障生产者幂等性
txnmgr, err := newTransactionManager(client.Config(), client)
if err != nil {
return nil, err
}
p := &asyncProducer{
client: client,
conf: client.Config(),
errors: make(chan *ProducerError),
input: make(chan *ProducerMessage),
successes: make(chan *ProducerMessage),
retries: make(chan *ProducerMessage),
brokers: make(map[*Broker]*brokerProducer),
brokerRefs: make(map[*brokerProducer]int),
txnmgr: txnmgr,
}
// 核心逻辑来了,go起来单体调度器
go withRecover(p.dispatcher)
// 调度器比较复杂,先看下重试机制
go withRecover(p.retryHandler)
return p, nil
}
// 先来看下重试机制(go起的单体)
// 基于 https://godoc.org/github.com/eapache/channels#InfiniteChannel 的设计,
// 有效地在 flushers 和 dispatcher 之间建立一个“桥梁”以避免死锁
func (p *asyncProducer) retryHandler() {
var msg *ProducerMessage
buf := queue.New()
for {
// 新建的一定为0,将retry的msg持有,当有新的retry过来时,将持有的顶部消息放回input,并释放原本持有的消息
if buf.Length() == 0 {
msg = <-p.retries
} else {
select {
case msg = <-p.retries:
case p.input <- buf.Peek().(*ProducerMessage):
buf.Remove()
continue
}
}
if msg == nil {
return
}
buf.Add(msg)
}
}
// 下面是重头戏,发送者的调度器
// go起来的单体;最外层消息调度器
func (p *asyncProducer) dispatcher() {
handlers := make(map[string]chan<- *ProducerMessage)
shuttingDown := false
// 最外层消息收集channel
for msg := range p.input {
if msg == nil {
Logger.Println("Something tried to send a nil message, it was ignored.")
continue
}
if msg.flags&shutdown != 0 {
shuttingDown = true
// shutdown的在处理消息-1
p.inFlight.Done()
continue
} else if msg.retries == 0 {
if shuttingDown {
// we can't just call returnError here because that decrements the wait group,
// which hasn't been incremented yet for this message, and shouldn't be
// 不能直接返回错误,这里还没有进行wg的+1操作,应该是结果读取那里也会有wg的操作,等到看完接受结果应该就清楚了
pErr := &ProducerError{Msg: msg, Err: ErrShuttingDown}
if p.conf.Producer.Return.Errors {
p.errors <- pErr
} else {
Logger.Println(pErr)
}
continue
}
// 正常在处理消息+1
p.inFlight.Add(1)
}
// 执行拦截器,类似中间件,感觉这个带safe的都是加上一个recover的
for _, interceptor := range p.conf.Producer.Interceptors {
msg.safelyApplyInterceptor(interceptor)
}
//...
// 起topic发送者,没有则新建
handler := handlers[msg.Topic]
if handler == nil {
handler = p.newTopicProducer(msg.Topic)
handlers[msg.Topic] = handler
}
// 把消息扔到topic调度器里
handler <- msg
}
// 退出时释放资源
for _, handler := range handlers {
close(handler)
}
}
func (p *asyncProducer) newTopicProducer(topic string) chan<- *ProducerMessage {
input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
tp := &topicProducer{
parent: p,
topic: topic,
// 接收最外层调度器扔过来的消息,怎么处理看下面的调度器了
input: input,
breaker: breaker.New(3, 1, 10*time.Second),
handlers: make(map[int32]chan<- *ProducerMessage),
partitioner: p.conf.Producer.Partitioner(topic),
}
// topic发送者的调度器
go withRecover(tp.dispatch)
return input
}
func (tp *topicProducer) dispatch() {
// 从topic发送者发过来的消息
for msg := range tp.input {
if msg.retries == 0 {
// 对消息分区,确定需要分发到哪个partition
if err := tp.partitionMessage(msg); err != nil {
tp.parent.returnError(msg, err)
continue
}
}
// 熟悉的配方,起partition发送者
handler := tp.handlers[msg.Partition]
if handler == nil {
handler = tp.parent.newPartitionProducer(msg.Topic, msg.Partition)
tp.handlers[msg.Partition] = handler
}
// 把消息扔到对应分区的消费者中
handler <- msg
}
for _, handler := range tp.handlers {
close(handler)
}
}
func (p *asyncProducer) newPartitionProducer(topic string, partition int32) chan<- *ProducerMessage {
input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
pp := &partitionProducer{
parent: p,
topic: topic,
partition: partition,
input: input,
breaker: breaker.New(3, 1, 10*time.Second),
retryState: make([]partitionRetryState, p.conf.Producer.Retry.Max+1),
}
// partition发送者调度器
go withRecover(pp.dispatch)
return input
}
func (pp *partitionProducer) dispatch() {
// try to prefetch the leader; if this doesn't work, we'll do a proper call to `updateLeader`
// on the first message
// 选取要发送到的broker,没有则新建broker发送者
pp.leader, _ = pp.parent.client.Leader(pp.topic, pp.partition)
if pp.leader != nil {
// broker发送者的处理逻辑在此
pp.brokerProducer = pp.parent.getBrokerProducer(pp.leader)
// we're generating a syn message; track it so we don't shut down while it's still inflight
// 发一条同步消息;wg+1,防止还未发完成就关闭
pp.parent.inFlight.Add(1)
pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}
}
// 分区调度器结束时,主动释放broker发送者
defer func() {
if pp.brokerProducer != nil {
pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
}
}()
// 从分区发送者发过来的消息
for msg := range pp.input {
if pp.brokerProducer != nil && pp.brokerProducer.abandoned != nil {
// 安全保障,fail fast
select {
case <-pp.brokerProducer.abandoned:
// a message on the abandoned channel means that our current broker selection is out of date
Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID())
pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
pp.brokerProducer = nil
time.Sleep(pp.parent.conf.Producer.Retry.Backoff)
default:
// producer connection is still open.
}
}
// 高水位线值,标识目前最高重试次数
if msg.retries > pp.highWatermark {
// 重试次数刷新高水位线;退步重试
pp.newHighWatermark(msg.retries)
pp.backoff(msg.retries)
} else if pp.highWatermark > 0 {
// highWatermark值会随着重试次数增加,默认是0,所以这里是开始有重试的逻辑
if msg.retries < pp.highWatermark {
// 并非最高重试级别的结束重试消息
if msg.flags&fin == fin {
pp.retryState[msg.retries].expectChaser = false
// wg减一,可以准备释放了
pp.parent.inFlight.Done()
} else {
// 否则加入重试
pp.retryState[msg.retries].buf = append(pp.retryState[msg.retries].buf, msg)
}
continue
} else if msg.flags&fin == fin {
// 最高重试级别的结束重试消息,清理重试缓存
pp.retryState[pp.highWatermark].expectChaser = false
pp.flushRetryBuffers()
pp.parent.inFlight.Done() // this fin is now handled and will be garbage collected
continue
}
}
// 检查brokerProducer,为nil就进行下初始化
if pp.brokerProducer == nil {
if err := pp.updateLeader(); err != nil {
pp.parent.returnError(msg, err)
pp.backoff(msg.retries)
continue
}
Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
}
// 现在能确定有可用的broker发送者了,开始生成序列号
// 所有消息重试时会记录重试次数。当然,忽略 syn/fin 这两种特殊消息,它们是用来同步发送者的
if pp.parent.conf.Producer.Idempotent && msg.retries == 0 && msg.flags == 0 {
msg.sequenceNumber, msg.producerEpoch = pp.parent.txnmgr.getAndIncrementSequenceNumber(msg.Topic, msg.Partition)
msg.hasSequence = true
}
// 把从partition发送者来的消息扔给broker发送者
pp.brokerProducer.input <- msg
}
}
func (p *asyncProducer) getBrokerProducer(broker *Broker) *brokerProducer {
p.brokerLock.Lock()
defer p.brokerLock.Unlock()
// 上锁操作,并发安全
bp := p.brokers[broker]
if bp == nil {
// 真实起一个broker发送者,处理逻辑在里面
bp = p.newBrokerProducer(broker)
p.brokers[broker] = bp
p.brokerRefs[bp] = 0
}
p.brokerRefs[bp]++
return bp
}
func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
var (
input = make(chan *ProducerMessage)
bridge = make(chan *produceSet)
responses = make(chan *brokerProducerResponse)
)
bp := &brokerProducer{
parent: p,
broker: broker,
input: input,
output: bridge,
responses: responses,
stopchan: make(chan struct{}),
buffer: newProduceSet(p),
currentRetries: make(map[string]map[int32]error),
}
go withRecover(bp.run)
// 这里读取的是上面创建的bridge结构,我们从partition发送者的调度中能看到消息是塞到input中的
// 那应该是在上面的run里将input中的消息转移到output的
go withRecover(func() {
for set := range bridge {
request := set.buildRequest()
// 真正的发送消息,总算到tcp了
response, err := broker.Produce(request)
// 这里并不是等待返回,只是将接受返回的结构放进去,异步读取;但如果发送过程有err,其实已经放入了
responses <- &brokerProducerResponse{
set: set,
err: err,
res: response,
}
}
close(responses)
})
if p.conf.Producer.Retry.Max <= 0 {
bp.abandoned = make(chan struct{})
}
return bp
}
func (bp *brokerProducer) run() {
var output chan<- *produceSet
Logger.Printf("producer/broker/%d starting up\n", bp.broker.ID())
for {
select {
case msg, ok := <-bp.input:
if !ok {
Logger.Printf("producer/broker/%d input chan closed\n", bp.broker.ID())
bp.shutdown()
return
}
if msg == nil {
continue
}
// 特殊消息处理
if msg.flags&syn == syn {
//...
continue
}
// 重试消息处理
if reason := bp.needsRetry(msg); reason != nil {
//...
continue
}
// 判断空间是否够用
if bp.buffer.wouldOverflow(msg) {
Logger.Printf("producer/broker/%d maximum request accumulated, waiting for space\n", bp.broker.ID())
if err := bp.waitForSpace(msg, false); err != nil {
bp.parent.retryMessage(msg, err)
continue
}
}
// ...
// 这里只是将消息放入缓存,并没有真的发
if err := bp.buffer.add(msg); err != nil {
bp.parent.returnError(msg, err)
continue
}
// 如果需要,重启定时器
if bp.parent.conf.Producer.Flush.Frequency > 0 && bp.timer == nil {
bp.timer = time.After(bp.parent.conf.Producer.Flush.Frequency)
}
case <-bp.timer:
// 如果时间未到,会优先于下一步的发送阻塞住,如果时间到了,那就置标记,就会将output赋值,就可以真实发送了
bp.timerFired = true
case output <- bp.buffer:
// 如果output为nil,会阻塞,但是因为上面定时器的存在,要么先一步阻塞,要么output已经有值,不会阻塞
bp.rollOver()
case response, ok := <-bp.responses:
// 返回结果处理
if ok {
bp.handleResponse(response)
}
case <-bp.stopchan:
// 结束信号
Logger.Printf(
"producer/broker/%d run loop asked to stop\n", bp.broker.ID())
return
}
// 定周期到了或者buffer中数量达到阈值,就给output赋值,可以真正的去发送了
if bp.timerFired || bp.buffer.readyToFlush() {
output = bp.output
} else {
output = nil
}
}
}
发送者发送
得益于上面机制暴露出口的单一、简单,发送过程就简单很多了,复杂的还是上面机制的理解
/* sync_producer.go*/
// SendMessage 介于刚才的机制,大家其实往发送者初始的input中扔消息就是了,但是跟异步发送者不同的是,
// 这里会将接受结果这一步做成阻塞的,从而达到等待结果返回的效果
func (sp *syncProducer) SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error) {
expectation := make(chan *ProducerError, 1)
msg.expectation = expectation
sp.producer.Input() <- msg
if err := <-expectation; err != nil {
return -1, -1, err.Err
}
return msg.Partition, msg.Offset, nil
}
// SendMessages 发送多条只是起一个goroutine循环扔,但是有一个err就会返回err,这个需要注意
func (sp *syncProducer) SendMessages(msgs []*ProducerMessage) error {
expectations := make(chan chan *ProducerError, len(msgs))
go func() {
for _, msg := range msgs {
expectation := make(chan *ProducerError, 1)
msg.expectation = expectation
sp.producer.Input() <- msg
expectations <- expectation
}
close(expectations)
}()
var errors ProducerErrors
for expectation := range expectations {
if err := <-expectation; err != nil {
errors = append(errors, err)
}
}
if len(errors) > 0 {
return errors
}
return nil
}
发送者释放
释放没有什么特别要说的,主要注意下最外层的wg和异步发送者本身的inFlight wg两个的控制
用shutdown消息来深入通知资源回收,确实是个好方式,就是select的case顺序需要注意下
/* sync_producer.go*/
func (sp *syncProducer) Close() error {
sp.producer.AsyncClose()
sp.wg.Wait()
return nil
}
/* async_producer */
func (p *asyncProducer) AsyncClose() {
go withRecover(p.shutdown)
}
func (p *asyncProducer) shutdown() {
Logger.Println("Producer shutting down.")
p.inFlight.Add(1)
p.input <- &ProducerMessage{flags: shutdown}
p.inFlight.Wait()
err := p.client.Close()
if err != nil {
Logger.Println("producer/shutdown failed to close the embedded client:", err)
}
close(p.input)
close(p.retries)
close(p.errors)
close(p.successes)
}
整理
回顾下问题
• 如何发送,是否还是依然partition为粒度?
先从topic发送者拆分成partition发送者,再收拢为broker发送者,确实达到过partition粒度
• 如何控制阻塞,跟非阻塞发送者有什么区别?
用结果的接收同步阻塞,跟非阻塞的发送者只是读取结果时机不同而已
• 释放资源时都做了什么?
外层wg主要用来回收成功结果处理和error结果处理两个goroutine
内部wg主要用来确保消息的处理正常完成
还有就是对于各个层级发送者和其调度器的回收
• 如何处理错误(返回值),是否还是channel异步获取?
依然是异步获取
发现bug两则
- 关闭还有可能继续接受消息的
producer
的回执消息处理chan
导致 panic fin
消息出现在需要重试发送消息时,但fin
消息不会触发重连broker
的逻辑。此时恰好遇到broker
断连需要更新,则会阻塞住,无法继续发送消息
总结
sarama库确实不亏是一个优秀的项目,很多地方值得我们学习,但是同时也有用数量判断是否需要Rebalance的"有风险"的机制存在,所以我们也不应该全部接纳,还是要保持自己的思考,取其精华为上
单体消费者和异步发送者暂不做阅读,主体的机制是类似的。