Go运维开发之日志收集(3)根据etcd配置项创建多个tailTask
Go运维开发之日志收集(3)根据etcd配置项创建多个tailTask
在上一篇中我们已经实现了从etcd中获取配置信息
下一步就是拿着这些配置项进行日志收集,之前的taillog的Init函数只是初始化一个tailObj用的。我们来改造它。
根据etcd的配置项创建多个tailtask
我们想要实现的是
/var/log/nginx/access.log
日志文件发送到kafka的web_log
中
/var/log/messages
日志文件发送到kafka的system_log
中…
循环每一个日志收集项,创建tailObj,之前的tailObj是全局的,之前的Init也就用不到了
taillog/tail.go
1// TailTask: 一个日志收集的任务
2type TailTask struct {
3 path string
4 topic string
5 instance *tail.Tail
6}
7
8func NewTailTask(path, topic string) (tailObj *TailTask){
9 tailObj = &TailTask{
10 path:path,
11 topic:topic,
12 }
13 tailObj.init() // 根据路径去打开对应的日志
14 return
15}
16
17// 初始化一个TailTask实例
18func (t *TailTask)init(){
19 config := tail.Config{
20 ReOpen: true, // 重新打开
21 Follow: true, // 是否跟随
22 Location: &tail.SeekInfo{Offset: 0, Whence: 2}, // 从文件的哪个地方开始读
23 MustExist: false, // 文件不存在不报错
24 Poll: true,
25 }
26 var err error
27 t.instance, err = tail.TailFile(t.path, config)
28 if err != nil {
29 fmt.Println("tail file failed, err:", err)
30 }
31
32 go t.run() // 直接去采集日志发送到kafka
33}
34
35func (t *TailTask)run(){
36 for {
37 select {
38 case line := <- t.instance.Lines :// 从tailObj的通道中一行一行的读取日志数据
39 // 3.2 发往Kafka
40 kafka.SendToKafka(t.topic, line.Text) // 函数调用函数
41 }
42 }
43}
创建一个用来管理tailTask的管理者taillog/tail_mgr.go
,并创建Init函数
taillog/tail_mgr.go
1package taillog
2
3import "logAgent/etcd"
4
5var taskMgr *tailLogMgr
6
7type tailLogMgr struct {
8 logEntry []*etcd.LogEntry
9 //taskMap map[string]*TailTask
10}
11
12func Init(logEntryConf []*etcd.LogEntry) {
13 taskMgr = &tailLogMgr{
14 logEntry: logEntryConf, // 把当前的日志收集项配置信息保存起来
15 }
16
17 // 遍历配置项
18 for _, logEntry := range logEntryConf {
19 //conf: *etcd.LogEntry
20 //logEntry.Path: 要收集的日志文件的路径
21 NewTailTask(logEntry.Path, logEntry.Topic)
22 }
23}
修改main.go文件,去掉之前的Init调用,改调用新的taillog.Init
函数
1 // 3. 收集日志发往Kafka
2 taillog.Init(logEntryConf)
函数调用函数优化
之前taillog/tail.log
中的run方法直接调用的kafka.SendToKafka
,我们来优化它
将数据放到通道中,从通道中获取信息,写入kafka,修改run函数
taillog/tail.go
1func (t *TailTask) run() {
2 for {
3 select {
4 case line := <-t.instance.Lines: // 从tailObj的通道中一行一行的读取日志数据
5 // 3.2 发往Kafka
6 //kafka.SendToKafka(t.topic, line.Text) // 函数调用函数
7 // 先把日志数据发到一个通道中
8 kafka.SendToChan(t.topic, line.Text)
9 // kafka那个包中有单独的goroutine去取日志数据发到kafka
10 }
11 }
12}
kafka/kafka.go
创建一个全局的chan logDataChan
,保存的结构体logData
信息,在初始化的时候初始化logDataChan
1type logData struct {
2 topic string
3 data string
4}
5
6var (
7 client sarama.SyncProducer // 声明一个全局的连接kafka的生产者client
8 logDataChan chan *logData
9)
10
11// Init 初始化client
12func Init(addrs []string, maxSize int)(err error){
13 config := sarama.NewConfig()
14 // tailf包使⽤
15 config.Producer.RequiredAcks = sarama.WaitForAll // 发送完数据需要leader和follow都确认
16 config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出⼀个 partition
17 config.Producer.Return.Successes = true // 成功交付的消息将在success channel返回
18
19 // 连接kafka
20 client, err = sarama.NewSyncProducer(addrs, config)
21 if err != nil {
22 fmt.Println("producer closed, err:", err)
23 return
24 }
25 // 初始化logDataChan
26 logDataChan = make(chan *logData, maxSize)
27 // 开启后台的goroutine从通道中取取数据发往kafka
28 go sendToKafka()
29 return
30}
31
32// 给外部暴露的一个函数,该函数只把日志数据发送到一个内部的channel中
33func SendToChan(topic, data string){
34 msg := &logData{
35 topic:topic,
36 data:data,
37 }
38 logDataChan <- msg
39}
40
41// 真正往kafka发送日志的函数
42func sendToKafka(){
43 for{
44 select {
45 case ld := <- logDataChan:
46 // 构造⼀个消息
47 msg := &sarama.ProducerMessage{}
48 msg.Topic = ld.topic
49 msg.Value = sarama.StringEncoder(ld.data)
50 // 发送到kafka
51 pid, offset, err := client.SendMessage(msg)
52 if err != nil {
53 fmt.Println("send msg failed, err:", err)
54 return
55 }
56 fmt.Printf("pid:%v offset:%v\n", pid, offset)
57 default:
58 time.Sleep(time.Millisecond*50)
59 }
60
61 }
62
63}
上面用到的maxSize写入到配置文件中,变成可配置项
修改conf/config.ini
增加一个配置项在kafka下
1[kafka]
2address=127.0.0.1:9092
3chan_max_size=100000
修改conf/config.go
的KafkaConf
结构体
1type KafkaConf struct {
2 Address string `ini:"address"`
3 ChanMaxSize int `ini:"chan_max_size"`
4}
修改main.go
,在初始化kafka的时候添加ChanMaxSize
配置项
1 err = kafka.Init([]string{cfg.KafkaConf.Address}, cfg.KafkaConf.ChanMaxSize)
检查效果
1./logAgent
2init kafka success
3init etcd success.
4get conf from etcd success, [0xc0000921a0]
5index:0 value:&{/var/log/nginx/access.log web_log}
现在已经能够从etcd获取配置信息,开启goroutine,但是没有使用sync.WaitGroup,主函数直接退出了,后面再加。
- 原文作者:黄忠德
- 原文链接:https://huangzhongde.cn/post/2020-03-04-golang_devops_logAgent_3_get_config_from_etcd_create_tailtask/
- 版权声明:本作品采用知识共享署名-非商业性使用-禁止演绎 4.0 国际许可协议进行许可,非商业转载请注明出处(作者,原文链接),商业转载请联系作者获得授权。