Go运维开发之日志收集(9)logTransfer支持多个Topic

Go运维开发之日志收集(1)收集应用程序日志到kafka中

Go运维开发之日志收集(2)从etcd中获取配置信息

Go运维开发之日志收集(3)根据etcd配置项创建多个tailTask

Go运维开发之日志收集(4)监视etcd配置项的变更

Go运维开发之日志收集(5)根据IP地址去拉取配置日志收集项

Go运维开发之日志收集(6)从kafka中获取日志信息

Go运维开发之日志收集(7)将日志入库到Elasticsearch并通过Kibana进行展示

Go运维开发之日志收集(8)将应用程序日志写入到文件中

前面logTransfer已经能够将日志信息从kafka写入ES中了,但是只能传一个topic,而且topic是写死在配置文件中的,显然不符合我们的预期,我们需要优化它。我们预期是能够从etcd中获取配置信息,可能有多个topic,并监听etcd的配置变更。

kafka消费者组

Consumer Group是Kafka提供的可扩展且具有容错性的消费者机制。

Consumer Group有以下三个特性:

  • Consumer Group可以有一个或多个Consumer实例。
  • Group ID表示唯一的一个Consumer Group,其是一个字符串。
  • Consumer Group下所有实例订阅的topic的单个分区,只能分配给组内的某个Consumer实例消费。这个分区也可以被其他Group消费。

使用消费者组的好处就是把初始的位移点设置为sarama.OffsetOldest不管是后面重启了logTransfer或者启动了多个logTransfer实例它都会记住上次消费到哪个位移点了。不用担心数据重复收集或者数据遗漏。

kafka消费者组示例

  1package main
  2
  3import (
  4	"context"
  5	"log"
  6	"os"
  7	"os/signal"
  8	"sync"
  9	"syscall"
 10
 11	"github.com/Shopify/sarama"
 12)
 13
 14var (
 15	brokers = []string{"172.16.10.10:9092"}
 16	group   = "my-group"
 17	topics  = []string{"system_log"}
 18	version = sarama.V0_10_2_0
 19)
 20
 21func main() {
 22	config := sarama.NewConfig()
 23	config.Version = version
 24	config.Consumer.Offsets.Initial = sarama.OffsetOldest
 25
 26	ctx, cancel := context.WithCancel(context.Background())
 27	client, err := sarama.NewConsumerGroup(brokers, group, config)
 28	if err != nil {
 29		log.Panicf("Error creating consumer group client: %v", err)
 30	}
 31
 32	consumer := Consumer{
 33		ready: make(chan bool),
 34	}
 35	wg := &sync.WaitGroup{}
 36	wg.Add(1)
 37	go func() {
 38		defer wg.Done()
 39		for {
 40			// `Consume` should be called inside an infinite loop, when a
 41			// server-side rebalance happens, the consumer session will need to be
 42			// recreated to get the new claims
 43			if err := client.Consume(ctx, topics, &consumer); err != nil {
 44				log.Panicf("Error from consumer: %v", err)
 45			}
 46			// check if context was cancelled, signaling that the consumer should stop
 47			if ctx.Err() != nil {
 48				return
 49			}
 50			consumer.ready = make(chan bool)
 51		}
 52	}()
 53	<-consumer.ready // Await till the consumer has been set up
 54	log.Println("Sarama consumer up and running!...")
 55
 56	sigterm := make(chan os.Signal, 1)
 57	signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
 58	select {
 59	case <-ctx.Done():
 60		log.Println("terminating: context cancelled")
 61	case <-sigterm:
 62		log.Println("terminating: via signal")
 63	}
 64	cancel()
 65	wg.Wait()
 66	if err = client.Close(); err != nil {
 67		log.Panicf("Error closing client: %v", err)
 68	}
 69}
 70
 71// Consumer represents a Sarama consumer group consumer
 72type Consumer struct {
 73	ready chan bool
 74}
 75
 76// Setup is run at the beginning of a new session, before ConsumeClaim
 77func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
 78	// Mark the consumer as ready
 79	close(consumer.ready)
 80	return nil
 81}
 82
 83// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
 84func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
 85	return nil
 86}
 87
 88// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
 89func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
 90
 91	// NOTE:
 92	// Do not move the code below to a goroutine.
 93	// The `ConsumeClaim` itself is called within a goroutine, see:
 94	// https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
 95	for message := range claim.Messages() {
 96		log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
 97		session.MarkMessage(message, "")
 98	}
 99
100	return nil
101}

运行之后第一次会获取kafka中最早的日志,获取完到最新重启还是最新的。

logTransfer引入消费组

我们对kafka包进行改造,同时配置文件中增加一个消费组配置

conf/config.ini

1[kafka]
2address = 172.16.10.10:9092
3topic = system_log,web_log
4group = logCollect

修改结构体conf/config.go文件

1type KafkaConf struct {
2	Address string `ini:"address"`
3	Topic   string `ini:"topic"`
4	Group   string `ini:"group"`
5}

修改kafka包

  1package kafka
  2
  3import (
  4	"context"
  5	"logCollect/logTransfer/es"
  6	"logCollect/logTransfer/logger"
  7	"os"
  8	"os/signal"
  9	"strings"
 10	"sync"
 11	"syscall"
 12
 13	"github.com/Shopify/sarama"
 14)
 15
 16// Init 初始化kafka连接,准备发送数据给es
 17func Init(brokers []string, topics, group string) (err error) {
 18	config := sarama.NewConfig()
 19	config.Version = sarama.V0_10_2_0
 20	config.Consumer.Offsets.Initial = sarama.OffsetOldest
 21
 22	ctx, cancel := context.WithCancel(context.Background())
 23	client, err := sarama.NewConsumerGroup(brokers, group, config)
 24	if err != nil {
 25		logger.Log.Panicf("Error creating consumer group client: %v", err)
 26	}
 27
 28	consumer := Consumer{
 29		ready: make(chan bool),
 30	}
 31	wg := &sync.WaitGroup{}
 32	wg.Add(1)
 33	go func() {
 34		defer wg.Done()
 35		for {
 36			// `Consume` should be called inside an infinite loop, when a
 37			// server-side rebalance happens, the consumer session will need to be
 38			// recreated to get the new claims
 39			if err := client.Consume(ctx, strings.Split(topics, ","), &consumer); err != nil {
 40				logger.Log.Panicf("Error from consumer: %v", err)
 41			}
 42			// check if context was cancelled, signaling that the consumer should stop
 43			if ctx.Err() != nil {
 44				return
 45			}
 46			consumer.ready = make(chan bool)
 47		}
 48	}()
 49
 50	<-consumer.ready // Await till the consumer has been set up
 51	logger.Log.Info("Sarama consumer up and running!...")
 52
 53	sigterm := make(chan os.Signal, 1)
 54	signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
 55	select {
 56	case <-ctx.Done():
 57		logger.Log.Info("terminating: context cancelled")
 58	case <-sigterm:
 59		logger.Log.Info("terminating: via signal")
 60	}
 61	cancel()
 62	wg.Wait()
 63	if err = client.Close(); err != nil {
 64		logger.Log.Panicf("Error closing client: %v", err)
 65	}
 66	return
 67}
 68
 69// Consumer represents a Sarama consumer group consumer
 70type Consumer struct {
 71	ready chan bool
 72}
 73
 74// Setup is run at the beginning of a new session, before ConsumeClaim
 75func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
 76	// Mark the consumer as ready
 77	close(consumer.ready)
 78	return nil
 79}
 80
 81// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
 82func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
 83	return nil
 84}
 85
 86// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
 87func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
 88
 89	// NOTE:
 90	// Do not move the code below to a goroutine.
 91	// The `ConsumeClaim` itself is called within a goroutine, see:
 92	// https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
 93	for message := range claim.Messages() {
 94		logger.Log.Debugf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
 95		// 准备发送数据给es
 96		ld := es.LogData{Topic: message.Topic, Data: string(message.Value)}
 97		//es.SendToES(topic, ld) // 函数调用函数
 98		// 优化一下: 直接放到一个chan中
 99		es.SendToChan(&ld)
100		session.MarkMessage(message, "")
101	}
102
103	return nil
104}

修改main函数入口

1	// 3.初始化kafka连接
2	err = kafka.Init([]string{cfg.KafkaConf.Address}, cfg.KafkaConf.Topic, cfg.KafkaConf.Group)
3	if err != nil {
4		logger.Log.Errorf("init kafka failed, err:%v\n", err)
5		return
6	}
7	logger.Log.Debug("init kafka success")

构建测试

1go build
2./logTransfer

完整代码

logCollect

查看日志文件,发现web_log和system_log两个topic的日志都正常获取到了。下一步就是怎么从etcd中动态的获取topic