原项目Github 链接 :https://github.com/Shopify/sarama
sarama是一个go语言实现的kafka客户端,是官方推荐的kafka客户端之一
特点是简单,易用,很容易上手。再加上纯go语言开发,对于我们理解、排查问题以及魔改都比较方便。
目前github 8.2k stars

本文基于 sarama v1.31.1
个人注释版:https://github.com/freephenix/sarama
后续的更新会提交到这里,文章可能更新延迟

消费者

下面开始阅读源码学习,首先,我希望能够带着问题去寻找答案,这样更加容易找到一条线,避免阅读时迷失方向。
也给大家建议在阅读不熟悉的项目时找准一条线,从浅入深,一层层去看,就是常听到的剥洋葱式学习方式,我认为这样的本意也是避免迷失,能更高效的理解系统。

先提几个问题

首先,我知道消费者分两种,一种是消费者组,一种是单体的消费者。对于我来说,消费者组更常用,所以先从它开始。
在消费者组工作过程中,我根据自己的经验,可以想象到,它主要有四个工作
• 如何消费消息
• 如何处理rebalance,增减负责消费的partition
• 如何处理异常
• 如何处理关闭信号
还有就是隐含在这四个工作之下的一些问题
• 如何和服务端通信
• 如何控制消费者组的启停
• 起停时都做了哪些工作
• ...
所以下面在阅读时候,我会着重寻找这几个问题的答案

主要数据结构

文件位置 ./consumer_group.go

// 消费者组对外暴露的interface,new方法中返回的就是这个
type ConsumerGroup interface {
   // 需要传入给定的topic组,还有消费这些消息的接口实现ConsumerGroupHandler 
   // ConsumerGroupHandler 下面会讲到,暂时先看消费者组session内的生命周期步骤
   // 1. 加入此消费者组,服务端会公平的分配partition到当前消费者
   // 2. 程序启动前,会调用handler中的Setup()方法,可以通知用户和进行准备工作
   // 3. 任何消息都会调用handler的ConsumeClaim()方法。
   //   这个方法在每个独立的goroutine中需要是线程安全的,消费状态需要小心的维护
   // 4. 消费者退出有三种情况handler的ConsumeClaim()退出;
   //   服务端发起了一次rebalance;父级context触发了cancel()
   // 5. 当所有ConsumeClaim()退出时,handler的Cleanup()钩子函数会被调用,
   //   用以执行rebalance前的收尾工作
   // 6. 消费完成时,标记并提交offset
   // 有几点需要注意, 一旦rebalance触发,那么消息处理函数ConsumeClaim()必须在配置项
   // Config.Consumer.Group.Rebalance.Timeout规定的时间内退出,而且需要给offset提交
   // 和清理函数Cleanup()留出时间. 如果超时未完成,那么这个消费者会被kafka服务端从这个组中除名,
   // 导致消息offset提交失败。
   // 这个方法应该被放在无限循环中,因为当rebalance发生时,这个函数会退出,所以需要再次调用启动
   Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error

   // Errors 接受sarama错误信息的channel,但是默认情况下,错误会直接打印日志而非发送到这里,
   // 如果需要的话,可以将Consumer.Return.Errors设置为true,然后就可以监听这个channel了
   Errors() <-chan error

   // Close 停止consumerGroup并将所有session分离出去. 
   // 为防止内存泄露,要在对象逃逸之前调用这个函数
   Close() error
}

type consumerGroup struct {
   // 通用kafka client 管理连接,必须close防止逃逸
   client Client
   // 配置项很多,暂不展开
   config   *Config
   // 管理真正的从broker消费消息的PartitionConsumer,必须close防止逃逸
   consumer Consumer
   // 组名
   groupID  string
   // ???
   memberID string
   // 接收错误的channel
   errors   chan error
   
   lock      sync.Mutex
   // 接收结束信号的channel
   closed    chan none
   // 保证只close一次
   closeOnce sync.Once

   // ???
   userData []byte
}

/*******************************************************************/

// ConsumerGroupSession 代表了一个consumerGroup的用户生命周期
type ConsumerGroupSession interface {
   // Claims 返回topic到被分配的partition数组的map
   Claims() map[string][]int32

   // MemberID 返回集群中的memberID
   MemberID() string

   // GenerationID 返回当前的GenerationID,这个字段类似选举的group Leader的版本号
   GenerationID() int32

   // MarkOffset 标记offset,同时在metadata中记录一个字符串,用来标识当时partition
   // consumer的状态,其他的consumer可以根据这个字符串恢复消费
   //
   // 按照上传约定, 调用这个方法时,你是在标记预期要读取的消息,而不是刚读取到的消息。
   // 也正因此,调用这个方法时,你需要传入的参数是上一个消息的offset+1
   //
   // 注意:因为效率的原因,调用 MarkOffset时并不会立即执行消息的提交,所以如果程序崩溃时,有可能一些消息
   // 并没有提交成功。也就是说你的程序有可能两次接受同样的一个消息,所以需要你的程序保证幂等性。
   MarkOffset(topic string, partition int32, offset int64, metadata string)

   // Commit 就是我们熟知的消息提交了
   // 注意: 调用Commit是一个同步阻塞的操作
   Commit()

   // ResetOffset 根据提供的值重置offset,同时在metadata中记录一个字符串,用来标识当时partition
   // consumer的状态。重置操作对应 MarkOffset,不同之处在于,它可以传入一个比当前offset小的值,而
   // MarkOffset只允许传入比当前offset大的值,详情请查阅 MarkOffset。
   //(cf. 是一个拉丁语衍生的(也是现代英语)词汇confer的简写,表示“比较”或“查阅”的意思。 主要用于普通法和成文法文本中,还有学术著作。)
   ResetOffset(topic string, partition int32, offset int64, metadata string)

   // MarkMessage 标记一个消息已经消费了。
   MarkMessage(msg *ConsumerMessage, metadata string)

   // Context 返回session中的context
   Context() context.Context
}

type consumerGroupSession struct {
   // 父级对象 consumerGroup
   parent       *consumerGroup
   memberID     string
   generationID int32
   handler      ConsumerGroupHandler

   claims  map[string][]int32
   // 处理offset相关功能,如提交、POM写入、POM解析
   offsets *offsetManager
   ctx     context.Context
   cancel  func()

   waitGroup       sync.WaitGroup
   // 保证资源清理只做一次
   releaseOnce     sync.Once
   // 心跳状态通知channel
   hbDying, hbDead chan none
}

/*******************************************************************/

// ConsumerGroupHandler 处理独立的topic或者分区的消息。同时也提供了钩子方法,允许你在session生命周期
// 的开始和结束时嵌入逻辑
// 请注意,这个handler会在多个不同的goroutine中调用,请确保状态是并发安全的,并且不存在数据竞争
type ConsumerGroupHandler interface {
   // Setup 在session的开始阶段运行, 早于 ConsumeClaim.
   Setup(ConsumerGroupSession) error

   // Cleanup 在session的结束阶段运行, 在所有 ConsumeClaim 的goroutines 退出之后、但在最后
   // 一次offset提交之前,调用且只调用一次
   Cleanup(ConsumerGroupSession) error

   // ConsumeClaim 必须起一个ConsumerGroupClaim的Messages()的循环,一旦Messages()关闭时,
   // Handler 必须结束循环并退出
   ConsumeClaim(ConsumerGroupSession, ConsumerGroupClaim) error
}

// ConsumerGroupClaim 在一个consumerGroup之内,处理一个topic和partition中的消息.
type ConsumerGroupClaim interface {
   // Topic 返回消费的topic
   Topic() string

   // Partition 返回消费的partition
   Partition() int32

   // InitialOffset 返回开始消费时初始化的offset
   InitialOffset() int64

   // HighWaterMarkOffset 返回服务端产生的下一条消息的offset(并不是客户端下一条接受的)。
   // 你可以根据这个判断自己的处理程序落后了多少
   //(i.e. 换句话说)
   HighWaterMarkOffset() int64

   // Messages 返回从broker读消息的channel。这个channel当新的rebalance触发时会关闭. 你必须在
   // Config.Consumer.Group.Session.Timeout 约定的时间内完成退出和标记offsets,这样才能让
   // topic/partition重新分配成为消费组中的新成员
   Messages() <-chan *ConsumerMessage
}

type consumerGroupClaim struct {
   topic     string
   partition int32
   offset    int64
   PartitionConsumer
}

以上是consumerGroup.go中三个接口类型和相应的几个结构体
通过对这些注释进行分析结合我们自己的使用,我们可以得出以下一些结论
1、consumerGroup是最外层的结构体,负责消费者组的起停
(1)其结构体内有负责管理连接的client
(2)实际消费用的是consumer结构体,真实消费消息的是consumer内的partitionConsumer
2、ConsumerGroupHandler是消息处理者的接口,用于对接业务逻辑和消费相关功能对接,还提供了启动开始和结束时两个钩子函数
3、ConsumerGroupSession是代表消费者组生命周期的结构,提供了一些列获取、控制消费者组状态信息的方法
4、ConsumerGroupClaim是消息的出口结构,主要功能就是用于消息的消费,提供消息的内容

消费者组启动

如何启动其实在对于结构体的注释阅读已经明白了,那我们主要要看的就是正常使用的启动时,那些方法都做了哪些事情
首先,是初始化

/* consumerGroup.go */

// NewConsumerGroup 主要工作是启动一个client,client的主要职责就是跟broker建tcp连接,保持更新元数据
func NewConsumerGroup(addrs []string, groupID string, config *Config) (ConsumerGroup, error) {
   // 起客户端,与broker保持通信,主要维护元数据
   client, err := NewClient(addrs, config)
   if err != nil {
      return nil, err
   }

   // 起消费者组,主要就是把client组装进去
   c, err := newConsumerGroup(groupID, client)
   if err != nil {
      _ = client.Close()
   }
   return c, err
}

// ...

func newConsumerGroup(groupID string, client Client) (ConsumerGroup, error) {
   config := client.Config()
   if !config.Version.IsAtLeast(V0_10_2_0) {
      return nil, ConfigurationError("consumer groups require Version to be >= V0_10_2_0")
   }

   consumer, err := NewConsumerFromClient(client)
   if err != nil {
      return nil, err
   }

   return &consumerGroup{
      client:   client,
      consumer: consumer,
      config:   config,
      groupID:  groupID,
      errors:   make(chan error, config.ChannelBufferSize),
      closed:   make(chan none),
   }, nil
}

/* client.go */


// NewClient 创建一个新的客户端,会随机选一个broker地址,自动拉取元数据信息,如果所有broker都拿不到地址,那客户端创建失败
func NewClient(addrs []string, conf *Config) (Client, error) {
   Logger.Println("Initializing new client")

   if conf == nil {
      conf = NewConfig()
   }

   if err := conf.Validate(); err != nil {
      return nil, err
   }

   if len(addrs) < 1 {
      return nil, ConfigurationError("You must provide at least one broker address")
   }

   client := &client{
      conf:                    conf,
      closer:                  make(chan none),
      closed:                  make(chan none),
      brokers:                 make(map[int32]*Broker),
      metadata:                make(map[string]map[int32]*PartitionMetadata),
      metadataTopics:          make(map[string]none),
      cachedPartitionsResults: make(map[string][maxPartitionIndex][]int32),
      coordinators:            make(map[string]int32),
   }

   // 多个broker时,打乱顺序
   client.randomizeSeedBrokers(addrs)

   // 默认为true,会多占些空间,不过会提前加载好元数据信息
   // 为false时,则只会在下方开的goroutine里定时刷新元数据
   if conf.Metadata.Full {
      // 刷新元数据,建立tcp连接,通信拿取元数据
      err := client.RefreshMetadata()
      switch err {
      case nil:
         break
      case ErrLeaderNotAvailable, ErrReplicaNotAvailable, ErrTopicAuthorizationFailed, ErrClusterAuthorizationFailed:
         Logger.Println(err)
      default:
         close(client.closed) // we haven't started the background updater yet, so we have to do this manually
         _ = client.Close()
         return nil, err
      }
   }
   // 起定时器,定期更新元数据
   go withRecover(client.backgroundMetadataUpdater)

   Logger.Println("Successfully initialized new client")

   return client, nil
}

/* broker.go */
// 在 client.RefreshMetadata() 中会有与broker建连的过程,这里简单提下

// Open 并发方式尝试去创建一个broker连接,如果已经连接过则会报错,并不会阻塞在这里,但是任何后续的broker操作都会阻塞等待连接成功或失败
// 想要获取返回结果,可以调用函数 Connected(),唯一直接返回的错误是配置项校验。没有配置项写入时会使用 NewConfig()的默认配置。
func (b *Broker) Open(conf *Config) error {
   if !atomic.CompareAndSwapInt32(&b.opened, 0, 1) {
      return ErrAlreadyConnected
   }

   if conf == nil {
      conf = NewConfig()
   }

   err := conf.Validate()
   if err != nil {
      return err
   }

   b.lock.Lock()

   go withRecover(func() {
      defer b.lock.Unlock()

      dialer := conf.getDialer()
      // 建立一个tcp连接,尝试拨号
      b.conn, b.connErr = dialer.Dial("tcp", b.addr)
      if b.connErr != nil {
         Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, b.connErr)
         b.conn = nil
         atomic.StoreInt32(&b.opened, 0)
         return
      }
      if conf.Net.TLS.Enable {
         // 包装为传输层安全的客户端
         b.conn = tls.Client(b.conn, validServerNameTLS(b.addr, conf.Net.TLS.Config))
      }

      b.conn = newBufConn(b.conn)
      b.conf = conf

      // ...
      
      b.done = make(chan bool)
      b.responses = make(chan responsePromise, b.conf.Net.MaxOpenRequests-1)

      if b.id >= 0 {
         Logger.Printf("Connected to broker at %s (registered as #%d)\n", b.addr, b.id)
      } else {
         Logger.Printf("Connected to broker at %s (unregistered)\n", b.addr)
      }
      // 监听返回内容,这里是每个broker中维护的处理response的地方
      go withRecover(b.responseReceiver)
   })

   return nil
}


func (b *Broker) responseReceiver() {
   var dead error
   // 无尽循环读取内容,直到Close()中b.responses被关闭
   for response := range b.responses {
      // 根据内容本身,选择分发到response.errors 或者 response.packets
      if dead != nil {
         // This was previously incremented in send() and
         // we are not calling updateIncomingCommunicationMetrics()
         b.addRequestInFlightMetrics(-1)
         response.errors <- dead
         continue
      }

      headerLength := getHeaderLength(response.headerVersion)
      header := make([]byte, headerLength)

      bytesReadHeader, err := b.readFull(header)
      requestLatency := time.Since(response.requestTime)
      if err != nil {
         b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
         dead = err
         response.errors <- err
         continue
      }

      decodedHeader := responseHeader{}
      err = versionedDecode(header, &decodedHeader, response.headerVersion)
      if err != nil {
         b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
         dead = err
         response.errors <- err
         continue
      }
      // correlationID 是发送时写入进去的(自增)关联ID,可以代表发送的顺序
      if decodedHeader.correlationID != response.correlationID {
         b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
         // TODO if decoded ID < cur ID, discard until we catch up
         // TODO if decoded ID > cur ID, save it so when cur ID catches up we have a response
         dead = PacketDecodingError{fmt.Sprintf("correlation ID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID)}
         response.errors <- dead
         continue
      }

      buf := make([]byte, decodedHeader.length-int32(headerLength)+4)
      bytesReadBody, err := b.readFull(buf)
      b.updateIncomingCommunicationMetrics(bytesReadHeader+bytesReadBody, requestLatency)
      if err != nil {
         dead = err
         response.errors <- err
         continue
      }

      response.packets <- buf
   }
   close(b.done)
}

然后,是consumerGroup.Consume()启动消费

/* consumerGroup.go */

// Consume implements ConsumerGroup.
func (c *consumerGroup) Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error {
   // Ensure group is not closed
   // fail fast
   select {
   case <-c.closed:
      return ErrClosedConsumerGroup
   default:
   }

   c.lock.Lock()
   defer c.lock.Unlock()

   // Quick exit when no topics are provided
   if len(topics) == 0 {
      return fmt.Errorf("no topics provided")
   }

   // Refresh metadata for requested topics
   // 刷新下元数据
   if err := c.client.RefreshMetadata(topics...); err != nil {
      return err
   }

   // Init session
   // 初始化 consumerGroupSession,启动真正的消费
   sess, err := c.newSession(ctx, topics, handler, c.config.Consumer.Group.Rebalance.Retry.Max)
   if err == ErrClosedClient {
      return ErrClosedConsumerGroup
   } else if err != nil {
      return err
   }

   // 无限循环检查分区数是否变化,决定是否rebalance,这里有漏洞,如果有起有停,那就不会触发
   go c.loopCheckPartitionNumbers(topics, sess)

   // Wait for session exit signal
   <-sess.ctx.Done()

   // Gracefully release session claims
   return sess.release(true)
}


func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int) (*consumerGroupSession, error) {
   // 选定一个broker,同步元数据、进行各种请求之类的
   coordinator, err := c.client.Coordinator(c.groupID)
   if err != nil {
      if retries <= 0 {
         return nil, err
      }
      // 重试机制,次数默认4
      return c.retryNewSession(ctx, topics, handler, retries, true)
   }

   // Join consumer group
   // 发送加入组中的请求
   join, err := c.joinGroupRequest(coordinator, topics)
   if err != nil {
      _ = coordinator.Close()
      return nil, err
   }
   switch join.Err {
       //...
   }

   // memberID 看来是发送加组请求后分配的唯一标识
   var plan BalanceStrategyPlan
   // 如果是leader则需要根据分配策略算出一个分配计划
   if join.LeaderId == join.MemberId {
      members, err := join.GetMembers()
      if err != nil {
         return nil, err
      }

      // 按配置中的策略分配,默认 BalanceStrategyRange
      plan, err = c.balance(members)
      if err != nil {
         return nil, err
      }
   }

   // 看来类似ack,只是如果是leader的话,还需要将分配完的计划返回
   groupRequest, err := c.syncGroupRequest(coordinator, plan, join.GenerationId)
   if err != nil {
      _ = coordinator.Close()
      return nil, err
   }
   switch groupRequest.Err {
      //...
   }

   // Retrieve and sort claims
   var claims map[string][]int32
   if len(groupRequest.MemberAssignment) > 0 {
      members, err := groupRequest.GetMemberAssignment()
      if err != nil {
         return nil, err
      }
      claims = members.Topics
      c.userData = members.UserData

      for _, partitions := range claims {
         sort.Sort(int32Slice(partitions))
      }
   }

   return newConsumerGroupSession(ctx, c, claims, join.MemberId, join.GenerationId, handler)
}



func newConsumerGroupSession(ctx context.Context, parent *consumerGroup, claims map[string][]int32, memberID string, generationID int32, handler ConsumerGroupHandler) (*consumerGroupSession, error) {
   // init offset manager
   offsets, err := newOffsetManagerFromClient(parent.groupID, memberID, generationID, parent.client)
   if err != nil {
      return nil, err
   }

   // init context
   ctx, cancel := context.WithCancel(ctx)

   // init session
   sess := &consumerGroupSession{
      parent:       parent,
      memberID:     memberID,
      generationID: generationID,
      handler:      handler,
      offsets:      offsets,
      claims:       claims,
      ctx:          ctx,
      cancel:       cancel,
      hbDying:      make(chan none),
      hbDead:       make(chan none),
   }

   // start heartbeat loop
   // 开始心跳,检测状态、处理错误,内有重试机制
   go sess.heartbeatLoop()

   // create a POM for each claim
   // 发送请求,获取初始offset等信息
   for topic, partitions := range claims {
      for _, partition := range partitions {
         pom, err := offsets.ManagePartition(topic, partition)
         if err != nil {
            _ = sess.release(false)
            return nil, err
         }

         // handle POM errors
         go func(topic string, partition int32) {
            for err := range pom.Errors() {
               sess.parent.handleError(err, topic, partition)
            }
         }(topic, partition)
      }
   }

   // perform setup
   // 调用 ConsumerGroupHandler 的 Setup() 开始消费前的钩子
   if err := handler.Setup(sess); err != nil {
      // 无处不在的有问题就释放,可以用defer吧?
      _ = sess.release(true)
      return nil, err
   }

   // start consuming
   for topic, partitions := range claims {
      for _, partition := range partitions {
         sess.waitGroup.Add(1)

         go func(topic string, partition int32) {
            defer sess.waitGroup.Done()

            // cancel the as session as soon as the first
            // goroutine exits
            defer sess.cancel()

            // consume a single topic/partition, blocking
            sess.consume(topic, partition)
         }(topic, partition)
      }
   }
   return sess, nil
}



func (s *consumerGroupSession) consume(topic string, partition int32) {
   // quick exit if rebalance is due
   select {
   case <-s.ctx.Done():
      return
   case <-s.parent.closed:
      return
   default:
   }

   // get next offset
   offset := s.parent.config.Consumer.Offsets.Initial
   if pom := s.offsets.findPOM(topic, partition); pom != nil {
      offset, _ = pom.NextOffset()
   }

   // create new claim
   // 带着初始化的offset创建新的claim,起一个分区的消费者,
   // 真正的消费者是建立在topic下的partition上的,partitionConsumer
   claim, err := newConsumerGroupClaim(s, topic, partition, offset)
   if err != nil {
      s.parent.handleError(err, topic, partition)
      return
   }

   // handle errors
   go func() {
      for err := range claim.Errors() {
         s.parent.handleError(err, topic, partition)
      }
   }()

   // trigger close when session is done
   // 再次判断是否close
   go func() {
      select {
      case <-s.ctx.Done():
      case <-s.parent.closed:
      }
      claim.AsyncClose()
   }()

   // start processing
   // 执行 consumerGroupHandler 的 ConsumeClaim(), 开始执行业务消费
   if err := s.handler.ConsumeClaim(s, claim); err != nil {
      s.parent.handleError(err, topic, partition)
   }

   // ensure consumer is closed & drained
   claim.AsyncClose()
   for _, err := range claim.waitClosed() {
      s.parent.handleError(err, topic, partition)
   }
}

消费者释放

close就暂且不说了,主要就是资源的释放,别忘了最终offset的提交
注意defer的应用,可以放一些默认返回等等,还有就是小心close时遇到panic,可以加上recover

消费者消费中

这里主要是看一些commit、markOffset等操作

/* consumerGroup.go */

// 并没有真的立即执行操作,只是将信息写到了POM中
func (s *consumerGroupSession) MarkOffset(topic string, partition int32, offset int64, metadata string) {
   if pom := s.offsets.findPOM(topic, partition); pom != nil {
      pom.MarkOffset(offset, metadata)
   }
}

func (s *consumerGroupSession) Commit() {
   s.offsets.Commit()
}

// 并没有真的立即执行操作,只是将信息写到了POM中 
func (s *consumerGroupSession) ResetOffset(topic string, partition int32, offset int64, metadata string) {
   if pom := s.offsets.findPOM(topic, partition); pom != nil {
      pom.ResetOffset(offset, metadata)
   }
}

/* offsetManager.go */

func (om *offsetManager) Commit() {
   om.flushToBroker()
   om.releasePOMs(false)
}

// 刷新offset到broker中
func (om *offsetManager) flushToBroker() {
   req := om.constructRequest()
   if req == nil {
      return
   }

   // 选取进行沟通的broker,注意点是尽可能减小锁的粒度,但同时不要忽略可用性校验
   broker, err := om.coordinator()
   if err != nil {
      om.handleError(err)
      return
   }

   resp, err := broker.CommitOffset(req)
   if err != nil {
      om.handleError(err)
      om.releaseCoordinator(broker)
      _ = broker.Close()
      return
   }
   // 大量使用如此的异步消费方式,减少时间等待,值得借鉴
   om.handleResponse(broker, req, resp)
}

// Releases/removes closed POMs once they are clean (or when forced)
func (om *offsetManager) releasePOMs(force bool) (remaining int) {
   om.pomsLock.Lock()
   defer om.pomsLock.Unlock()

   for topic, topicManagers := range om.poms {
      for partition, pom := range topicManagers {
         pom.lock.Lock()
         releaseDue := pom.done && (force || !pom.dirty)
         pom.lock.Unlock()

         if releaseDue {
            pom.release()

            delete(om.poms[topic], partition)
            if len(om.poms[topic]) == 0 {
               delete(om.poms, topic)
            }
         }
      }
      remaining += len(om.poms[topic])
   }
   return
}

整理

再回头来看这些问题
• 如何消费消息
底层使用partitionConsumer,单起goroutine进行消息消费
• 如何处理rebalance,增减负责消费的partition
根据元数据中的分区数变化判断时候需要rebalance,直接重新进行consume,以应用变更
• 如何处理异常
多数采用异步读取errChan的方式获取错误信息并做相应处理
• 如何处理关闭信号
关闭时,使用ctx串联内部,以及close chennel,判断是否需要开始释放资源
• 如何和服务端通信
使用tcp方式通信,response使用channel监听,异步拿结果
• 如何控制消费者组的启停
使用ctx和错误channel控制
• 起停时都做了哪些工作
起:获取(刷新)元数据,leader还需要进行分配,初始化POM,执行钩子,开始消费
停:释放资源、退出消费者goroutine、执行钩子、提交未提交的offset、清理POM

剩下还有一小部分了,不过大体上与使用者比较能接触上的主要就是这些,深层机制可以后续再谈
下面看看发送者

发送者

先提几个问题

惯例,先提问题,再看代码
• 如何发送,是否还是依然partition为粒度?
• 如何控制阻塞,跟非阻塞发送者有什么区别?
• 释放资源时都做了什么?
• 如何处理错误(返回值),是否还是channel异步获取?
因为已经看了消费者的代码,可能很多问题已经有了预设,可以看下还有没有更多的细节
主要数据结构
因工作中主要使用同步发送者,这里主要看同步消费者,其实底层也是用的异步消费者,可谓一举两得

/* sync_producer.go*/

// SyncProducer 用于发送kafka消息,在收到确认回复前会一直阻塞。它负责将消息发送到正确的broker,
// 适当刷新元数据,并且会解析错误返回。当逃逸时,它并不一定会被垃圾回收自动清理掉,所以为了防止资源泄露,你需要调用 Close()。
// 使用 SyncProducer 有两个注意点:它通常比 AsyncProducer 效率低,并且在确认消息时提供的实际持久性保证取决于
// Producer.RequiredAcks 的配置值,在某些配置中,由 SyncProducer 确认的消息有时仍会丢失。
// 因为实现的原因, SyncProducer 需要`Producer.Return.Errors` and `Producer.Return.Successes`两个配置项设置为true
type SyncProducer interface {

   // SendMessage 发送消息接口
   SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error)

   // SendMessage 批量发送消息接口,有一个错误就会返回error
   SendMessages(msgs []*ProducerMessage) error

   // Close 清理资源接口,记得一定要调用啊,否则会造成资源泄露
   Close() error
}

// 一个异步发送者,一个waitGroup
type syncProducer struct {
   producer *asyncProducer
   wg       sync.WaitGroup
}

发送者启动

发送者的机制还是很值得借鉴学习的,一个是层层传递,入口很单一,但是因为逻辑复杂度,需要从topic发送者分化成为多个partition发送者,又因为希望出口单一,并提高效率,所以又收拢为broker发送者,定时&定量阈值双开启的机制去发送,兼顾效率和复杂度,很值得学习

/* sync_producer.go*/

// NewSyncProducer 创建一个新的同步发送者
func NewSyncProducer(addrs []string, config *Config) (SyncProducer, error) {
   
   //...

   // 其实还是起一个异步发送者
   p, err := NewAsyncProducer(addrs, config)
   if err != nil {
      return nil, err
   }
   // 异步接受成功和错误信息,应该是通过阻塞返回值实现的了
   return newSyncProducerFromAsyncProducer(p.(*asyncProducer)), nil
}

// verifyProducerConfig 同步发送者 `config.Producer.Return.Errors` 和 `config.Producer.Return.Successes`
// 两个配置必须是要为true
func verifyProducerConfig(config *Config) error {
   if !config.Producer.Return.Errors {
      return ConfigurationError("Producer.Return.Errors must be true to be used in a SyncProducer")
   }
   if !config.Producer.Return.Successes {
      return ConfigurationError("Producer.Return.Successes must be true to be used in a SyncProducer")
   }
   return nil
}

/* async_producer.go*/

// NewAsyncProducer creates a new AsyncProducer using the given broker addresses and configuration.
func NewAsyncProducer(addrs []string, conf *Config) (AsyncProducer, error) {
   // 依然是需要创建一个client,刷新元数据等
   client, err := NewClient(addrs, conf)
   if err != nil {
      return nil, err
   }
   return newAsyncProducer(client)
}


func newAsyncProducer(client Client) (AsyncProducer, error) {
   // Check that we are not dealing with a closed Client before processing any other arguments
   if client.Closed() {
      return nil, ErrClosedClient
   }

   // 启动一个事务管理,保障生产者幂等性
   txnmgr, err := newTransactionManager(client.Config(), client)
   if err != nil {
      return nil, err
   }

   p := &asyncProducer{
      client:     client,
      conf:       client.Config(),
      errors:     make(chan *ProducerError),
      input:      make(chan *ProducerMessage),
      successes:  make(chan *ProducerMessage),
      retries:    make(chan *ProducerMessage),
      brokers:    make(map[*Broker]*brokerProducer),
      brokerRefs: make(map[*brokerProducer]int),
      txnmgr:     txnmgr,
   }

   // 核心逻辑来了,go起来单体调度器
   go withRecover(p.dispatcher)
   // 调度器比较复杂,先看下重试机制
   go withRecover(p.retryHandler)

   return p, nil
}

// 先来看下重试机制(go起的单体)
// 基于 https://godoc.org/github.com/eapache/channels#InfiniteChannel 的设计,
// 有效地在 flushers 和 dispatcher 之间建立一个“桥梁”以避免死锁
func (p *asyncProducer) retryHandler() {
   var msg *ProducerMessage
   buf := queue.New()

   for {
      // 新建的一定为0,将retry的msg持有,当有新的retry过来时,将持有的顶部消息放回input,并释放原本持有的消息
      if buf.Length() == 0 {
         msg = <-p.retries
      } else {
         select {
         case msg = <-p.retries:
         case p.input <- buf.Peek().(*ProducerMessage):
            buf.Remove()
            continue
         }
      }

      if msg == nil {
         return
      }

      buf.Add(msg)
   }
}


// 下面是重头戏,发送者的调度器

// go起来的单体;最外层消息调度器
func (p *asyncProducer) dispatcher() {
   handlers := make(map[string]chan<- *ProducerMessage)
   shuttingDown := false

   // 最外层消息收集channel
   for msg := range p.input {
      if msg == nil {
         Logger.Println("Something tried to send a nil message, it was ignored.")
         continue
      }

      if msg.flags&shutdown != 0 {
         shuttingDown = true
         // shutdown的在处理消息-1
         p.inFlight.Done()
         continue
      } else if msg.retries == 0 {
         if shuttingDown {
            // we can't just call returnError here because that decrements the wait group,
            // which hasn't been incremented yet for this message, and shouldn't be
            // 不能直接返回错误,这里还没有进行wg的+1操作,应该是结果读取那里也会有wg的操作,等到看完接受结果应该就清楚了
            pErr := &ProducerError{Msg: msg, Err: ErrShuttingDown}
            if p.conf.Producer.Return.Errors {
               p.errors <- pErr
            } else {
               Logger.Println(pErr)
            }
            continue
         }
         // 正常在处理消息+1
         p.inFlight.Add(1)
      }

      // 执行拦截器,类似中间件,感觉这个带safe的都是加上一个recover的
      for _, interceptor := range p.conf.Producer.Interceptors {
         msg.safelyApplyInterceptor(interceptor)
      }

      //...

      // 起topic发送者,没有则新建
      handler := handlers[msg.Topic]
      if handler == nil {
         handler = p.newTopicProducer(msg.Topic)
         handlers[msg.Topic] = handler
      }

      // 把消息扔到topic调度器里
      handler <- msg
   }
   // 退出时释放资源
   for _, handler := range handlers {
      close(handler)
   }
}

func (p *asyncProducer) newTopicProducer(topic string) chan<- *ProducerMessage {
   input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
   tp := &topicProducer{
      parent:      p,
      topic:       topic,
      // 接收最外层调度器扔过来的消息,怎么处理看下面的调度器了
      input:       input,
      breaker:     breaker.New(3, 1, 10*time.Second),
      handlers:    make(map[int32]chan<- *ProducerMessage),
      partitioner: p.conf.Producer.Partitioner(topic),
   }
   // topic发送者的调度器
   go withRecover(tp.dispatch)
   return input
}

func (tp *topicProducer) dispatch() {
   // 从topic发送者发过来的消息
   for msg := range tp.input {
      if msg.retries == 0 {
         // 对消息分区,确定需要分发到哪个partition
         if err := tp.partitionMessage(msg); err != nil {
            tp.parent.returnError(msg, err)
            continue
         }
      }

      // 熟悉的配方,起partition发送者
      handler := tp.handlers[msg.Partition]
      if handler == nil {
         handler = tp.parent.newPartitionProducer(msg.Topic, msg.Partition)
         tp.handlers[msg.Partition] = handler
      }

      // 把消息扔到对应分区的消费者中
      handler <- msg
   }

   for _, handler := range tp.handlers {
      close(handler)
   }
}


func (p *asyncProducer) newPartitionProducer(topic string, partition int32) chan<- *ProducerMessage {
   input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
   pp := &partitionProducer{
      parent:    p,
      topic:     topic,
      partition: partition,
      input:     input,
      breaker:    breaker.New(3, 1, 10*time.Second),
      retryState: make([]partitionRetryState, p.conf.Producer.Retry.Max+1),
   }
   // partition发送者调度器
   go withRecover(pp.dispatch)
   return input
}


func (pp *partitionProducer) dispatch() {
   // try to prefetch the leader; if this doesn't work, we'll do a proper call to `updateLeader`
   // on the first message
   // 选取要发送到的broker,没有则新建broker发送者
   pp.leader, _ = pp.parent.client.Leader(pp.topic, pp.partition)
   if pp.leader != nil {
      // broker发送者的处理逻辑在此
      pp.brokerProducer = pp.parent.getBrokerProducer(pp.leader)
      // we're generating a syn message; track it so we don't shut down while it's still inflight
      // 发一条同步消息;wg+1,防止还未发完成就关闭
      pp.parent.inFlight.Add(1)
      pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}
   }

   // 分区调度器结束时,主动释放broker发送者
   defer func() {
      if pp.brokerProducer != nil {
         pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
      }
   }()

   // 从分区发送者发过来的消息
   for msg := range pp.input {
      if pp.brokerProducer != nil && pp.brokerProducer.abandoned != nil {
         // 安全保障,fail fast
         select {
         case <-pp.brokerProducer.abandoned:
            // a message on the abandoned channel means that our current broker selection is out of date
            Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID())
            pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
            pp.brokerProducer = nil
            time.Sleep(pp.parent.conf.Producer.Retry.Backoff)
         default:
            // producer connection is still open.
         }
      }
      // 高水位线值,标识目前最高重试次数
      if msg.retries > pp.highWatermark {
         // 重试次数刷新高水位线;退步重试
         pp.newHighWatermark(msg.retries)
         pp.backoff(msg.retries)
      } else if pp.highWatermark > 0 {
         // highWatermark值会随着重试次数增加,默认是0,所以这里是开始有重试的逻辑
         if msg.retries < pp.highWatermark {
            // 并非最高重试级别的结束重试消息
            if msg.flags&fin == fin {
               pp.retryState[msg.retries].expectChaser = false
               // wg减一,可以准备释放了
               pp.parent.inFlight.Done()
            } else {
               // 否则加入重试
               pp.retryState[msg.retries].buf = append(pp.retryState[msg.retries].buf, msg)
            }
            continue
         } else if msg.flags&fin == fin {
            // 最高重试级别的结束重试消息,清理重试缓存
            pp.retryState[pp.highWatermark].expectChaser = false
            pp.flushRetryBuffers()
            pp.parent.inFlight.Done() // this fin is now handled and will be garbage collected
            continue
         }
      }

      // 检查brokerProducer,为nil就进行下初始化
      if pp.brokerProducer == nil {
         if err := pp.updateLeader(); err != nil {
            pp.parent.returnError(msg, err)
            pp.backoff(msg.retries)
            continue
         }
         Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
      }

      // 现在能确定有可用的broker发送者了,开始生成序列号
      // 所有消息重试时会记录重试次数。当然,忽略 syn/fin 这两种特殊消息,它们是用来同步发送者的
      if pp.parent.conf.Producer.Idempotent && msg.retries == 0 && msg.flags == 0 {
         msg.sequenceNumber, msg.producerEpoch = pp.parent.txnmgr.getAndIncrementSequenceNumber(msg.Topic, msg.Partition)
         msg.hasSequence = true
      }

      // 把从partition发送者来的消息扔给broker发送者
      pp.brokerProducer.input <- msg
   }
}

func (p *asyncProducer) getBrokerProducer(broker *Broker) *brokerProducer {
   p.brokerLock.Lock()
   defer p.brokerLock.Unlock()
   // 上锁操作,并发安全
   bp := p.brokers[broker]

   if bp == nil {
      // 真实起一个broker发送者,处理逻辑在里面
      bp = p.newBrokerProducer(broker)
      p.brokers[broker] = bp
      p.brokerRefs[bp] = 0
   }

   p.brokerRefs[bp]++

   return bp
}

func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
   var (
      input     = make(chan *ProducerMessage)
      bridge    = make(chan *produceSet)
      responses = make(chan *brokerProducerResponse)
   )

   bp := &brokerProducer{
      parent:         p,
      broker:         broker,
      input:          input,
      output:         bridge,
      responses:      responses,
      stopchan:       make(chan struct{}),
      buffer:         newProduceSet(p),
      currentRetries: make(map[string]map[int32]error),
   }
   go withRecover(bp.run)

   // 这里读取的是上面创建的bridge结构,我们从partition发送者的调度中能看到消息是塞到input中的
   // 那应该是在上面的run里将input中的消息转移到output的
   go withRecover(func() {
      for set := range bridge {
         request := set.buildRequest()
         // 真正的发送消息,总算到tcp了
         response, err := broker.Produce(request)
         // 这里并不是等待返回,只是将接受返回的结构放进去,异步读取;但如果发送过程有err,其实已经放入了
         responses <- &brokerProducerResponse{
            set: set,
            err: err,
            res: response,
         }
      }
      close(responses)
   })

   if p.conf.Producer.Retry.Max <= 0 {
      bp.abandoned = make(chan struct{})
   }

   return bp
}


func (bp *brokerProducer) run() {
   var output chan<- *produceSet
   Logger.Printf("producer/broker/%d starting up\n", bp.broker.ID())

   for {
      select {
      case msg, ok := <-bp.input:
         if !ok {
            Logger.Printf("producer/broker/%d input chan closed\n", bp.broker.ID())
            bp.shutdown()
            return
         }

         if msg == nil {
            continue
         }
         // 特殊消息处理
         if msg.flags&syn == syn {
            //...
            continue
         }
         // 重试消息处理
         if reason := bp.needsRetry(msg); reason != nil {
            //...
            continue
         }
         // 判断空间是否够用
         if bp.buffer.wouldOverflow(msg) {
            Logger.Printf("producer/broker/%d maximum request accumulated, waiting for space\n", bp.broker.ID())
            if err := bp.waitForSpace(msg, false); err != nil {
               bp.parent.retryMessage(msg, err)
               continue
            }
         }

         // ... 
         
         // 这里只是将消息放入缓存,并没有真的发
         if err := bp.buffer.add(msg); err != nil {
            bp.parent.returnError(msg, err)
            continue
         }

         // 如果需要,重启定时器
         if bp.parent.conf.Producer.Flush.Frequency > 0 && bp.timer == nil {
            bp.timer = time.After(bp.parent.conf.Producer.Flush.Frequency)
         }
      case <-bp.timer:
         // 如果时间未到,会优先于下一步的发送阻塞住,如果时间到了,那就置标记,就会将output赋值,就可以真实发送了
         bp.timerFired = true
      case output <- bp.buffer:
         // 如果output为nil,会阻塞,但是因为上面定时器的存在,要么先一步阻塞,要么output已经有值,不会阻塞
         bp.rollOver()
      case response, ok := <-bp.responses:
         // 返回结果处理
         if ok {
            bp.handleResponse(response)
         }
      case <-bp.stopchan:
         // 结束信号
         Logger.Printf(
            "producer/broker/%d run loop asked to stop\n", bp.broker.ID())
         return
      }

      // 定周期到了或者buffer中数量达到阈值,就给output赋值,可以真正的去发送了
      if bp.timerFired || bp.buffer.readyToFlush() {
         output = bp.output
      } else {
         output = nil
      }
   }
}

发送者发送

得益于上面机制暴露出口的单一、简单,发送过程就简单很多了,复杂的还是上面机制的理解

/* sync_producer.go*/

// SendMessage 介于刚才的机制,大家其实往发送者初始的input中扔消息就是了,但是跟异步发送者不同的是,
// 这里会将接受结果这一步做成阻塞的,从而达到等待结果返回的效果
func (sp *syncProducer) SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error) {
   expectation := make(chan *ProducerError, 1)
   msg.expectation = expectation
   sp.producer.Input() <- msg
   
   if err := <-expectation; err != nil {
      return -1, -1, err.Err
   }

   return msg.Partition, msg.Offset, nil
}

// SendMessages 发送多条只是起一个goroutine循环扔,但是有一个err就会返回err,这个需要注意
func (sp *syncProducer) SendMessages(msgs []*ProducerMessage) error {
   expectations := make(chan chan *ProducerError, len(msgs))
   go func() {
      for _, msg := range msgs {
         expectation := make(chan *ProducerError, 1)
         msg.expectation = expectation
         sp.producer.Input() <- msg
         expectations <- expectation
      }
      close(expectations)
   }()

   var errors ProducerErrors
   for expectation := range expectations {
      if err := <-expectation; err != nil {
         errors = append(errors, err)
      }
   }

   if len(errors) > 0 {
      return errors
   }
   return nil
}

发送者释放

释放没有什么特别要说的,主要注意下最外层的wg和异步发送者本身的inFlight wg两个的控制
用shutdown消息来深入通知资源回收,确实是个好方式,就是select的case顺序需要注意下

/* sync_producer.go*/
func (sp *syncProducer) Close() error {
   sp.producer.AsyncClose()
   sp.wg.Wait()
   return nil
}

/* async_producer */
func (p *asyncProducer) AsyncClose() {
   go withRecover(p.shutdown)
}

func (p *asyncProducer) shutdown() {
   Logger.Println("Producer shutting down.")
   p.inFlight.Add(1)
   p.input <- &ProducerMessage{flags: shutdown}

   p.inFlight.Wait()

   err := p.client.Close()
   if err != nil {
      Logger.Println("producer/shutdown failed to close the embedded client:", err)
   }

   close(p.input)
   close(p.retries)
   close(p.errors)
   close(p.successes)
}

整理

回顾下问题
• 如何发送,是否还是依然partition为粒度?
先从topic发送者拆分成partition发送者,再收拢为broker发送者,确实达到过partition粒度
• 如何控制阻塞,跟非阻塞发送者有什么区别?
用结果的接收同步阻塞,跟非阻塞的发送者只是读取结果时机不同而已
• 释放资源时都做了什么?
外层wg主要用来回收成功结果处理和error结果处理两个goroutine
内部wg主要用来确保消息的处理正常完成
还有就是对于各个层级发送者和其调度器的回收
• 如何处理错误(返回值),是否还是channel异步获取?
依然是异步获取

发现bug两则

  • 关闭还有可能继续接受消息的producer的回执消息处理chan 导致 panic
  • fin消息出现在需要重试发送消息时,但fin消息不会触发重连broker的逻辑。此时恰好遇到broker断连需要更新,则会阻塞住,无法继续发送消息

总结

sarama库确实不亏是一个优秀的项目,很多地方值得我们学习,但是同时也有用数量判断是否需要Rebalance的"有风险"的机制存在,所以我们也不应该全部接纳,还是要保持自己的思考,取其精华为上

单体消费者和异步发送者暂不做阅读,主体的机制是类似的。

引用

Sarama 源码阅读(一): 客户端是如何与 broker 通信的
Go基础系列:nil channel用法示例