Go运维开发之日志收集(4)监视etcd配置项的变更

Go运维开发之日志收集(1)收集应用程序日志到kafka中

Go运维开发之日志收集(2)从etcd中获取配置信息

Go运维开发之日志收集(3)根据etcd配置项创建多个tailTask

在上一篇中我们已经实现了从etcd中获取配置信息并创建tailTask任务,现在我们来通过etcd的watch实现新配置的变更。

实现watch新的配置

etcd/etcd.go新增WatchConf函数,用于监视etcd的key的变化,当有变化时通知taskMgr

 1// WatchConf 用于监视key的变化
 2func WatchConf(key string, newConfCh chan<- []*LogEntry){
 3	ch := client.Watch(context.Background(), key)
 4	// 从通道尝试取值(监视的信息)
 5	for wresp := range ch{
 6		for _, evt := range wresp.Events{
 7			fmt.Printf("Type:%v key:%v value:%v\n", evt.Type, string(evt.Kv.Key), string(evt.Kv.Value))
 8			// 通知taillog.tskMgr
 9			// 1. 先判断操作的类型
10			var newConf []*LogEntry
11			if evt.Type != clientv3.EventTypeDelete {
12				// 如果是删除操作,手动传递一个空的配置项
13				err := json.Unmarshal(evt.Kv.Value, &newConf)
14				if err != nil {
15					fmt.Printf("unmarshal failed, err:%v\n", err)
16					continue
17				}
18			}
19			fmt.Printf(" get new conf:%v\n", newConf)
20			newConfCh <- newConf
21		}
22	}
23}

修改taillog/tail_mgr.go的tailLogMgr结构体,并在Init的时候初始化

 1// tailTask 管理者
 2type tailLogMgr struct {
 3	logEntry    []*etcd.LogEntry
 4	taskMap     map[string]*TailTask  // 用于保存tailtask
 5	newConfChan chan []*etcd.LogEntry // 获取新的配置的通道
 6}
 7
 8func Init(logEntryConf []*etcd.LogEntry) {
 9	taskMgr = &tailLogMgr{
10		logEntry:    logEntryConf, // 把当前的日志收集项配置信息保存起来
11		taskMap:      make(map[string]*TailTask, 16),
12		newConfChan: make(chan []*etcd.LogEntry), // 无缓冲区的通道
13	}
14
15	for _, logEntry := range logEntryConf {
16		//conf: *etcd.LogEntry
17		//logEntry.Path: 要收集的日志文件的路径
18		NewTailTask(logEntry.Path, logEntry.Topic)
19	}
20	go taskMgr.run()
21}
22
23// 监听自己的newConfChan,有了新的配置过来之后就做对应的处理
24
25func (t *tailLogMgr) run() {
26	for {
27		select {
28		case newConf := <-t.newConfChan:
29			// 1.配置新增
30      // 2.配置删除
31      // 3.配置变更
32			fmt.Println("新的配置来了!", newConf)
33		default:
34			time.Sleep(time.Second)
35		}
36	}
37}
38
39// 一个函数,向外暴露taskMgr的newConfChan
40func NewConfChan() chan<- []*etcd.LogEntry {
41	return taskMgr.newConfChan
42}

main.go初始化之后开始监视etcd指定key的变更

 1var (
 2	cfg = new(conf.AppConf)
 3	wg  sync.WaitGroup
 4)
 5
 6  // 3. 收集日志发往Kafka
 7	taillog.Init(logEntryConf)
 8	// 因为NewConfChan访问了tskMgr的newConfChan, 这个channel是在taillog.Init(logEntryConf) 执行的初始化
 9	newConfChan := taillog.NewConfChan() // 从taillog包中获取对外暴露的通道
10	wg.Add(1)
11	go etcd.WatchConf(cfg.EtcdConf.Key, newConfChan) // 哨兵发现最新的配置信息会通知上面的那个通道
12	wg.Wait()

测试验证

1go build
2./logAgent
3init kafka success
4init etcd success.
5get conf from etcd success, [0xc0001918c0]
6index:0 value:&{/var/log/nginx/access.log web_log}
72020/03/04 16:08:06 Waiting for /var/log/nginx/access.log to appear...

尝试从etcdctl客户端修改配置信息

1etcdctl --endpoints=http://localhost:2379 put /logagent/collect '[{"path":"/var/log/messages","topic":"system_log"}]'
2OK
3etcdctl --endpoints=http://localhost:2379 put /logagent/collect '[{"path":"./mysql.log","topic":"mysql_log"}]'
4OK

查看运行日志

1Type:PUT key:/logagent/collect value:[{"path":"/var/log/messages","topic":"system_log"}]
2 get new conf:[0xc00000f720]
3新的配置来了! [0xc00000f720]
4Type:PUT key:/logagent/collect value:[{"path":"./mysql.log","topic":"mysql_log"}]
5 get new conf:[0xc00000fd80]
6新的配置来了! [0xc00000fd80]

已经实现了新配置的监视,下面来通过获取的配置信息作对应的处理

实现新增收集任务

现在我们来改造tailogMgr的run方法

我们将taillog/tail_mgr.go的task保存到taskMap

 1func Init(logEntryConf []*etcd.LogEntry) {
 2	taskMgr = &tailLogMgr{
 3		logEntry:    logEntryConf, // 把当前的日志收集项配置信息保存起来
 4		taskMap:     make(map[string]*TailTask, 16),
 5		newConfChan: make(chan []*etcd.LogEntry), // 无缓冲区的通道
 6	}
 7
 8	for _, logEntry := range logEntryConf {
 9		//conf: *etcd.LogEntry
10		//logEntry.Path: 要收集的日志文件的路径
11		// 初始化的时候起了多少个tailtask 都要记下来,为了后续判断方便
12		tailObj := NewTailTask(logEntry.Path, logEntry.Topic)
13		mk := fmt.Sprintf("%s_%s", logEntry.Path, logEntry.Topic)
14		taskMgr.taskMap[mk] = tailObj
15	}
16	go taskMgr.run()
17}

然后修改run方法,对每次配置发生变更与保存起来的taskMap进行比较

 1func (t *tailLogMgr) run() {
 2	for {
 3		select {
 4		case newConf := <-t.newConfChan:
 5			for _, conf := range newConf {
 6				mk := fmt.Sprintf("%s_%s", conf.Path, conf.Topic)
 7				_, ok := t.taskMap[mk]
 8				if ok {
 9					// 原来就有,不需要操作
10					continue
11				} else {
12					// 新增的
13					tailObj := NewTailTask(conf.Path, conf.Topic)
14					t.taskMap[mk] = tailObj
15				}
16			}
17			fmt.Println("新的配置来了!", newConf)
18		default:
19			time.Sleep(time.Second)
20		}
21	}
22}

重新构建测试

1go build
2./logAgent
3init kafka success
4init etcd success.
5get conf from etcd success, [0xc0003622a0]
6index:0 value:&{./mysql.log mysql_log}
72020/03/04 16:32:16 Waiting for ./mysql.log to appear...
82020/03/04 16:32:16 Waiting for ./mysql.log to appear...

我修改etcd的key值,实现新增

1etcdctl --endpoints=http://localhost:2379 put /logagent/collect '[{"path":"./my.log","topic":"web_log"}]'
2OK
3echo "test log for web_log" >> my.log

日志如下

1Type:PUT key:/logagent/collect value:[{"path":"./my.log","topic":"web_log"}]
2 get new conf:[0xc0000bfb80]
3新的配置来了! [0xc0000bfb80]
4get log data from ./my.log success, log:test log for web_log
5pid:0 offset:0

我们检查下有没有写入到kafka中

1docker exec kafka kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic web_log --from-beginning
2test log for web_log

已经能够正常从kafka获取到数据说明推送正常。

实现删除收集任务

由于某种原因,需要将日志收集的topic变更或者不再需要收集,这个时候就需要删除之前的收集任务了。

继续修改taillog/tail_mgr.go中的run的逻辑

 1func (t *tailLogMgr) run() {
 2	for {
 3		select {
 4		case newConf := <-t.newConfChan:
 5			for _, conf := range newConf {
 6				mk := fmt.Sprintf("%s_%s", conf.Path, conf.Topic)
 7				_, ok := t.taskMap[mk]
 8				if ok {
 9					// 原来就有,不需要操作
10					continue
11				} else {
12					// 新增的
13					tailObj := NewTailTask(conf.Path, conf.Topic)
14					t.taskMap[mk] = tailObj
15				}
16			}
17      // 2. 配置删除
18			// 找出原来t.logEntry有,但是newConf中没有的,要删掉
19			for _, c1 := range t.logEntry { // 从原来的配置中依次拿出配置项
20				isDelete := true
21				for _, c2 := range newConf { // 去新的配置中逐一进行比较
22					if c2.Path == c1.Path && c2.Topic == c1.Topic {
23						isDelete = false
24						continue
25					}
26				}
27				if isDelete {
28					// 把c1对应的这个tailObj给停掉
29					mk := fmt.Sprintf("%s_%s", c1.Path, c1.Topic)
30					//t.tskMap[mk] ==> tailObj
31					t.taskMap[mk].cancelFunc()
32				}
33			}
34			fmt.Println("新的配置来了!", newConf)
35		default:
36			time.Sleep(time.Second)
37		}
38	}
39}

taillog/tail.go进行改造,实现退出功能

 1// TailTask: 一个日志收集的任务
 2type TailTask struct {
 3	path     string
 4	topic    string
 5	instance *tail.Tail
 6	// 为了能实现退出t.run()
 7	ctx        context.Context
 8	cancelFunc context.CancelFunc
 9}
10
11func NewTailTask(path, topic string) (tailObj *TailTask) {
12	ctx, cancel := context.WithCancel(context.Background())
13	tailObj = &TailTask{
14		path:       path,
15		topic:      topic,
16		ctx:        ctx,
17		cancelFunc: cancel,
18	}
19	tailObj.init() // 根据路径去打开对应的日志
20	return
21}
22
23func (t *TailTask) run() {
24	for {
25		select {
26		case <-t.ctx.Done():
27			fmt.Printf("tail task:%s_%s 结束了...\n", t.path, t.topic)
28			return
29		case line := <-t.instance.Lines: // 从tailObj的通道中一行一行的读取日志数据
30			// 3.2 发往Kafka
31			//kafka.SendToKafka(t.topic, line.Text) // 函数调用函数
32			// 先把日志数据发到一个通道中
33			fmt.Printf("get log data from %s success, log:%v\n", t.path, line.Text)
34			kafka.SendToChan(t.topic, line.Text)
35			// kafka那个包中有单独的goroutine去取日志数据发到kafka
36		}
37	}
38}

构建测试删除

1go build
2./logAgent
3./logAgent
4init kafka success
5init etcd success.
6get conf from etcd success, [0xc0000b1700]
7index:0 value:&{./my.log web_log}
82020/03/04 17:54:24 Waiting for ./my.log to appear...

先新增任务

1etcdctl --endpoints=http://localhost:2379 put /logagent/collect '[{"path":"/var/log/messages","topic":"system_log"},{"path":"./my.log", "topic": "mysql_log"}]'

之前的my.log是写入web_log的,我们已经看到my.log换成mysql_log这个topic了

1Type:PUT key:/logagent/collect value:[{"path":"/var/log/messages","topic":"system_log"},{"path":"./my.log", "topic": "mysql_log"}]
2 get new conf:[0xc00020c420 0xc00020c440]
3新的配置来了! [0xc00020c420 0xc00020c440]
42020/03/04 17:56:27 Seeked ./my.log - &{Offset:0 Whence:2}
52020/03/04 17:56:27 Waiting for /var/log/messages to appear...

现在我们尝试在my.log写入数据,测试看日志写入到哪个topic中了

1date >> my.log
2date >> my.log
3date >> my.log

查看kafka

1bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic mysql_log --from-beginning
22020年 3月 4日 星期三 17时58分49秒 CST
32020年 3月 4日 星期三 17时58分50秒 CST
42020年 3月 4日 星期三 17时58分51秒 CST

查看应用程序日志,也已经换成mysql_log这个topic了

1Type:PUT key:/logagent/collect value:[{"path":"./my.log","topic":"mysql_log"}]
2 get new conf:[0xc0003b6160]
3新的配置来了! [0xc0003b6160]
4tail task:./my.log_web_log 结束了...
52020/03/04 17:57:44 Seeked ./my.log - &{Offset:0 Whence:2}

好了,任务收集这块已经正常了。

Github代码地址:logAgent