Go运维开发之日志收集(9)logTransfer支持多个Topic
Go运维开发之日志收集(9)logTransfer支持多个Topic
Go运维开发之日志收集(3)根据etcd配置项创建多个tailTask
Go运维开发之日志收集(5)根据IP地址去拉取配置日志收集项
Go运维开发之日志收集(7)将日志入库到Elasticsearch并通过Kibana进行展示
前面logTransfer已经能够将日志信息从kafka写入ES中了,但是只能传一个topic,而且topic是写死在配置文件中的,显然不符合我们的预期,我们需要优化它。我们预期是能够从etcd中获取配置信息,可能有多个topic,并监听etcd的配置变更。
kafka消费者组
Consumer Group是Kafka提供的可扩展且具有容错性的消费者机制。
Consumer Group有以下三个特性:
- Consumer Group可以有一个或多个Consumer实例。
- Group ID表示唯一的一个Consumer Group,其是一个字符串。
- Consumer Group下所有实例订阅的topic的单个分区,只能分配给组内的某个Consumer实例消费。这个分区也可以被其他Group消费。
使用消费者组的好处就是把初始的位移点设置为sarama.OffsetOldest
不管是后面重启了logTransfer或者启动了多个logTransfer实例它都会记住上次消费到哪个位移点了。不用担心数据重复收集或者数据遗漏。
kafka消费者组示例
1package main
2
3import (
4 "context"
5 "log"
6 "os"
7 "os/signal"
8 "sync"
9 "syscall"
10
11 "github.com/Shopify/sarama"
12)
13
14var (
15 brokers = []string{"172.16.10.10:9092"}
16 group = "my-group"
17 topics = []string{"system_log"}
18 version = sarama.V0_10_2_0
19)
20
21func main() {
22 config := sarama.NewConfig()
23 config.Version = version
24 config.Consumer.Offsets.Initial = sarama.OffsetOldest
25
26 ctx, cancel := context.WithCancel(context.Background())
27 client, err := sarama.NewConsumerGroup(brokers, group, config)
28 if err != nil {
29 log.Panicf("Error creating consumer group client: %v", err)
30 }
31
32 consumer := Consumer{
33 ready: make(chan bool),
34 }
35 wg := &sync.WaitGroup{}
36 wg.Add(1)
37 go func() {
38 defer wg.Done()
39 for {
40 // `Consume` should be called inside an infinite loop, when a
41 // server-side rebalance happens, the consumer session will need to be
42 // recreated to get the new claims
43 if err := client.Consume(ctx, topics, &consumer); err != nil {
44 log.Panicf("Error from consumer: %v", err)
45 }
46 // check if context was cancelled, signaling that the consumer should stop
47 if ctx.Err() != nil {
48 return
49 }
50 consumer.ready = make(chan bool)
51 }
52 }()
53 <-consumer.ready // Await till the consumer has been set up
54 log.Println("Sarama consumer up and running!...")
55
56 sigterm := make(chan os.Signal, 1)
57 signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
58 select {
59 case <-ctx.Done():
60 log.Println("terminating: context cancelled")
61 case <-sigterm:
62 log.Println("terminating: via signal")
63 }
64 cancel()
65 wg.Wait()
66 if err = client.Close(); err != nil {
67 log.Panicf("Error closing client: %v", err)
68 }
69}
70
71// Consumer represents a Sarama consumer group consumer
72type Consumer struct {
73 ready chan bool
74}
75
76// Setup is run at the beginning of a new session, before ConsumeClaim
77func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
78 // Mark the consumer as ready
79 close(consumer.ready)
80 return nil
81}
82
83// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
84func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
85 return nil
86}
87
88// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
89func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
90
91 // NOTE:
92 // Do not move the code below to a goroutine.
93 // The `ConsumeClaim` itself is called within a goroutine, see:
94 // https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
95 for message := range claim.Messages() {
96 log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
97 session.MarkMessage(message, "")
98 }
99
100 return nil
101}
运行之后第一次会获取kafka中最早的日志,获取完到最新重启还是最新的。
logTransfer引入消费组
我们对kafka包进行改造,同时配置文件中增加一个消费组配置
conf/config.ini
1[kafka]
2address = 172.16.10.10:9092
3topic = system_log,web_log
4group = logCollect
修改结构体conf/config.go
文件
1type KafkaConf struct {
2 Address string `ini:"address"`
3 Topic string `ini:"topic"`
4 Group string `ini:"group"`
5}
修改kafka包
1package kafka
2
3import (
4 "context"
5 "logCollect/logTransfer/es"
6 "logCollect/logTransfer/logger"
7 "os"
8 "os/signal"
9 "strings"
10 "sync"
11 "syscall"
12
13 "github.com/Shopify/sarama"
14)
15
16// Init 初始化kafka连接,准备发送数据给es
17func Init(brokers []string, topics, group string) (err error) {
18 config := sarama.NewConfig()
19 config.Version = sarama.V0_10_2_0
20 config.Consumer.Offsets.Initial = sarama.OffsetOldest
21
22 ctx, cancel := context.WithCancel(context.Background())
23 client, err := sarama.NewConsumerGroup(brokers, group, config)
24 if err != nil {
25 logger.Log.Panicf("Error creating consumer group client: %v", err)
26 }
27
28 consumer := Consumer{
29 ready: make(chan bool),
30 }
31 wg := &sync.WaitGroup{}
32 wg.Add(1)
33 go func() {
34 defer wg.Done()
35 for {
36 // `Consume` should be called inside an infinite loop, when a
37 // server-side rebalance happens, the consumer session will need to be
38 // recreated to get the new claims
39 if err := client.Consume(ctx, strings.Split(topics, ","), &consumer); err != nil {
40 logger.Log.Panicf("Error from consumer: %v", err)
41 }
42 // check if context was cancelled, signaling that the consumer should stop
43 if ctx.Err() != nil {
44 return
45 }
46 consumer.ready = make(chan bool)
47 }
48 }()
49
50 <-consumer.ready // Await till the consumer has been set up
51 logger.Log.Info("Sarama consumer up and running!...")
52
53 sigterm := make(chan os.Signal, 1)
54 signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
55 select {
56 case <-ctx.Done():
57 logger.Log.Info("terminating: context cancelled")
58 case <-sigterm:
59 logger.Log.Info("terminating: via signal")
60 }
61 cancel()
62 wg.Wait()
63 if err = client.Close(); err != nil {
64 logger.Log.Panicf("Error closing client: %v", err)
65 }
66 return
67}
68
69// Consumer represents a Sarama consumer group consumer
70type Consumer struct {
71 ready chan bool
72}
73
74// Setup is run at the beginning of a new session, before ConsumeClaim
75func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
76 // Mark the consumer as ready
77 close(consumer.ready)
78 return nil
79}
80
81// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
82func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
83 return nil
84}
85
86// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
87func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
88
89 // NOTE:
90 // Do not move the code below to a goroutine.
91 // The `ConsumeClaim` itself is called within a goroutine, see:
92 // https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
93 for message := range claim.Messages() {
94 logger.Log.Debugf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
95 // 准备发送数据给es
96 ld := es.LogData{Topic: message.Topic, Data: string(message.Value)}
97 //es.SendToES(topic, ld) // 函数调用函数
98 // 优化一下: 直接放到一个chan中
99 es.SendToChan(&ld)
100 session.MarkMessage(message, "")
101 }
102
103 return nil
104}
修改main函数入口
1 // 3.初始化kafka连接
2 err = kafka.Init([]string{cfg.KafkaConf.Address}, cfg.KafkaConf.Topic, cfg.KafkaConf.Group)
3 if err != nil {
4 logger.Log.Errorf("init kafka failed, err:%v\n", err)
5 return
6 }
7 logger.Log.Debug("init kafka success")
构建测试
1go build
2./logTransfer
完整代码
查看日志文件,发现web_log和system_log两个topic的日志都正常获取到了。下一步就是怎么从etcd中动态的获取topic
- 原文作者:黄忠德
- 原文链接:https://huangzhongde.cn/post/2020-03-10-golang_devops_logAgent_9_kafka_consumer_group_multi_topics/
- 版权声明:本作品采用知识共享署名-非商业性使用-禁止演绎 4.0 国际许可协议进行许可,非商业转载请注明出处(作者,原文链接),商业转载请联系作者获得授权。