Go运维开发之日志收集(3)根据etcd配置项创建多个tailTask

Go运维开发之日志收集(1)收集应用程序到日志kafka中

Go运维开发之日志收集(2)从etcd中获取配置信息

在上一篇中我们已经实现了从etcd中获取配置信息

下一步就是拿着这些配置项进行日志收集,之前的taillog的Init函数只是初始化一个tailObj用的。我们来改造它。

根据etcd的配置项创建多个tailtask

我们想要实现的是

/var/log/nginx/access.log 日志文件发送到kafka的web_log

/var/log/messages 日志文件发送到kafka的system_log中…

循环每一个日志收集项,创建tailObj,之前的tailObj是全局的,之前的Init也就用不到了

taillog/tail.go

 1// TailTask: 一个日志收集的任务
 2type TailTask struct {
 3	path string
 4	topic string
 5	instance *tail.Tail 
 6}
 7
 8func NewTailTask(path, topic string) (tailObj *TailTask){
 9	tailObj = &TailTask{
10		path:path,
11		topic:topic,
12	}
13	tailObj.init() // 根据路径去打开对应的日志
14	return
15}
16
17// 初始化一个TailTask实例
18func (t *TailTask)init(){
19	config := tail.Config{
20		ReOpen:    true,                                 // 重新打开
21		Follow:    true,                                 // 是否跟随
22		Location:  &tail.SeekInfo{Offset: 0, Whence: 2}, // 从文件的哪个地方开始读
23		MustExist: false,                                // 文件不存在不报错
24		Poll:      true,
25	}
26	var err error
27	t.instance, err = tail.TailFile(t.path, config)
28	if err != nil {
29		fmt.Println("tail file failed, err:", err)
30	}
31
32	go t.run() // 直接去采集日志发送到kafka
33}
34
35func (t *TailTask)run(){
36	for {
37		select {
38		case line := <- t.instance.Lines :// 从tailObj的通道中一行一行的读取日志数据
39			// 3.2 发往Kafka
40			kafka.SendToKafka(t.topic, line.Text) // 函数调用函数
41		}
42	}
43}

创建一个用来管理tailTask的管理者taillog/tail_mgr.go,并创建Init函数

taillog/tail_mgr.go

 1package taillog
 2
 3import "logAgent/etcd"
 4
 5var taskMgr *tailLogMgr
 6
 7type tailLogMgr struct {
 8	logEntry []*etcd.LogEntry
 9	//taskMap map[string]*TailTask
10}
11
12func Init(logEntryConf []*etcd.LogEntry) {
13	taskMgr = &tailLogMgr{
14		logEntry: logEntryConf, // 把当前的日志收集项配置信息保存起来
15	}
16
17	// 遍历配置项
18	for _, logEntry := range logEntryConf {
19		//conf: *etcd.LogEntry
20		//logEntry.Path: 要收集的日志文件的路径
21		NewTailTask(logEntry.Path, logEntry.Topic)
22	}
23}

修改main.go文件,去掉之前的Init调用,改调用新的taillog.Init函数

1	// 3. 收集日志发往Kafka
2	taillog.Init(logEntryConf)

函数调用函数优化

之前taillog/tail.log中的run方法直接调用的kafka.SendToKafka,我们来优化它

将数据放到通道中,从通道中获取信息,写入kafka,修改run函数

taillog/tail.go

 1func (t *TailTask) run() {
 2	for {
 3		select {
 4		case line := <-t.instance.Lines: // 从tailObj的通道中一行一行的读取日志数据
 5			// 3.2 发往Kafka
 6			//kafka.SendToKafka(t.topic, line.Text) // 函数调用函数
 7			// 先把日志数据发到一个通道中
 8			kafka.SendToChan(t.topic, line.Text)
 9			// kafka那个包中有单独的goroutine去取日志数据发到kafka
10		}
11	}
12}

kafka/kafka.go

创建一个全局的chan logDataChan,保存的结构体logData信息,在初始化的时候初始化logDataChan

 1type logData struct {
 2	topic string
 3	data string
 4}
 5
 6var (
 7	client sarama.SyncProducer // 声明一个全局的连接kafka的生产者client
 8	logDataChan chan *logData
 9)
10
11// Init 初始化client
12func Init(addrs []string, maxSize int)(err error){
13	config := sarama.NewConfig()
14	// tailf包使⽤
15	config.Producer.RequiredAcks = sarama.WaitForAll // 发送完数据需要leader和follow都确认
16	config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出⼀个 partition
17	config.Producer.Return.Successes = true // 成功交付的消息将在success channel返回
18
19	// 连接kafka
20	client, err = sarama.NewSyncProducer(addrs, config)
21	if err != nil {
22		fmt.Println("producer closed, err:", err)
23		return
24	}
25	// 初始化logDataChan
26	logDataChan = make(chan *logData, maxSize)
27	// 开启后台的goroutine从通道中取取数据发往kafka
28	go sendToKafka()
29	return
30}
31
32// 给外部暴露的一个函数,该函数只把日志数据发送到一个内部的channel中
33func SendToChan(topic, data string){
34	msg := &logData{
35		topic:topic,
36		data:data,
37	}
38	logDataChan <- msg
39}
40
41// 真正往kafka发送日志的函数
42func sendToKafka(){
43	for{
44		select {
45			case ld := <- logDataChan:
46				// 构造⼀个消息
47				msg := &sarama.ProducerMessage{}
48				msg.Topic = ld.topic
49				msg.Value = sarama.StringEncoder(ld.data)
50				// 发送到kafka
51				pid, offset, err := client.SendMessage(msg)
52				if err != nil {
53					fmt.Println("send msg failed, err:", err)
54					return
55				}
56				fmt.Printf("pid:%v offset:%v\n", pid, offset)
57		default:
58			time.Sleep(time.Millisecond*50)
59		}
60
61	}
62
63}

上面用到的maxSize写入到配置文件中,变成可配置项

修改conf/config.ini增加一个配置项在kafka下

1[kafka]
2address=127.0.0.1:9092
3chan_max_size=100000

修改conf/config.goKafkaConf结构体

1type KafkaConf struct {
2	Address     string `ini:"address"`
3	ChanMaxSize int    `ini:"chan_max_size"`
4}

修改main.go,在初始化kafka的时候添加ChanMaxSize配置项

1	err = kafka.Init([]string{cfg.KafkaConf.Address}, cfg.KafkaConf.ChanMaxSize)

检查效果

1./logAgent
2init kafka success
3init etcd success.
4get conf from etcd success, [0xc0000921a0]
5index:0 value:&{/var/log/nginx/access.log web_log}

现在已经能够从etcd获取配置信息,开启goroutine,但是没有使用sync.WaitGroup,主函数直接退出了,后面再加。