Go运维开发之日志收集(4)监视etcd配置项的变更
Go运维开发之日志收集(4)监视etcd配置项的变更
Go运维开发之日志收集(3)根据etcd配置项创建多个tailTask
在上一篇中我们已经实现了从etcd中获取配置信息并创建tailTask任务,现在我们来通过etcd的watch实现新配置的变更。
实现watch新的配置
etcd/etcd.go
新增WatchConf
函数,用于监视etcd的key的变化,当有变化时通知taskMgr
1// WatchConf 用于监视key的变化
2func WatchConf(key string, newConfCh chan<- []*LogEntry){
3 ch := client.Watch(context.Background(), key)
4 // 从通道尝试取值(监视的信息)
5 for wresp := range ch{
6 for _, evt := range wresp.Events{
7 fmt.Printf("Type:%v key:%v value:%v\n", evt.Type, string(evt.Kv.Key), string(evt.Kv.Value))
8 // 通知taillog.tskMgr
9 // 1. 先判断操作的类型
10 var newConf []*LogEntry
11 if evt.Type != clientv3.EventTypeDelete {
12 // 如果是删除操作,手动传递一个空的配置项
13 err := json.Unmarshal(evt.Kv.Value, &newConf)
14 if err != nil {
15 fmt.Printf("unmarshal failed, err:%v\n", err)
16 continue
17 }
18 }
19 fmt.Printf(" get new conf:%v\n", newConf)
20 newConfCh <- newConf
21 }
22 }
23}
修改taillog/tail_mgr.go
的tailLogMgr结构体,并在Init的时候初始化
1// tailTask 管理者
2type tailLogMgr struct {
3 logEntry []*etcd.LogEntry
4 taskMap map[string]*TailTask // 用于保存tailtask
5 newConfChan chan []*etcd.LogEntry // 获取新的配置的通道
6}
7
8func Init(logEntryConf []*etcd.LogEntry) {
9 taskMgr = &tailLogMgr{
10 logEntry: logEntryConf, // 把当前的日志收集项配置信息保存起来
11 taskMap: make(map[string]*TailTask, 16),
12 newConfChan: make(chan []*etcd.LogEntry), // 无缓冲区的通道
13 }
14
15 for _, logEntry := range logEntryConf {
16 //conf: *etcd.LogEntry
17 //logEntry.Path: 要收集的日志文件的路径
18 NewTailTask(logEntry.Path, logEntry.Topic)
19 }
20 go taskMgr.run()
21}
22
23// 监听自己的newConfChan,有了新的配置过来之后就做对应的处理
24
25func (t *tailLogMgr) run() {
26 for {
27 select {
28 case newConf := <-t.newConfChan:
29 // 1.配置新增
30 // 2.配置删除
31 // 3.配置变更
32 fmt.Println("新的配置来了!", newConf)
33 default:
34 time.Sleep(time.Second)
35 }
36 }
37}
38
39// 一个函数,向外暴露taskMgr的newConfChan
40func NewConfChan() chan<- []*etcd.LogEntry {
41 return taskMgr.newConfChan
42}
在main.go
初始化之后开始监视etcd指定key的变更
1var (
2 cfg = new(conf.AppConf)
3 wg sync.WaitGroup
4)
5
6 // 3. 收集日志发往Kafka
7 taillog.Init(logEntryConf)
8 // 因为NewConfChan访问了tskMgr的newConfChan, 这个channel是在taillog.Init(logEntryConf) 执行的初始化
9 newConfChan := taillog.NewConfChan() // 从taillog包中获取对外暴露的通道
10 wg.Add(1)
11 go etcd.WatchConf(cfg.EtcdConf.Key, newConfChan) // 哨兵发现最新的配置信息会通知上面的那个通道
12 wg.Wait()
测试验证
1go build
2./logAgent
3init kafka success
4init etcd success.
5get conf from etcd success, [0xc0001918c0]
6index:0 value:&{/var/log/nginx/access.log web_log}
72020/03/04 16:08:06 Waiting for /var/log/nginx/access.log to appear...
尝试从etcdctl客户端修改配置信息
1etcdctl --endpoints=http://localhost:2379 put /logagent/collect '[{"path":"/var/log/messages","topic":"system_log"}]'
2OK
3etcdctl --endpoints=http://localhost:2379 put /logagent/collect '[{"path":"./mysql.log","topic":"mysql_log"}]'
4OK
查看运行日志
1Type:PUT key:/logagent/collect value:[{"path":"/var/log/messages","topic":"system_log"}]
2 get new conf:[0xc00000f720]
3新的配置来了! [0xc00000f720]
4Type:PUT key:/logagent/collect value:[{"path":"./mysql.log","topic":"mysql_log"}]
5 get new conf:[0xc00000fd80]
6新的配置来了! [0xc00000fd80]
已经实现了新配置的监视,下面来通过获取的配置信息作对应的处理
实现新增收集任务
现在我们来改造tailogMgr的run方法
我们将taillog/tail_mgr.go
的task保存到taskMap
1func Init(logEntryConf []*etcd.LogEntry) {
2 taskMgr = &tailLogMgr{
3 logEntry: logEntryConf, // 把当前的日志收集项配置信息保存起来
4 taskMap: make(map[string]*TailTask, 16),
5 newConfChan: make(chan []*etcd.LogEntry), // 无缓冲区的通道
6 }
7
8 for _, logEntry := range logEntryConf {
9 //conf: *etcd.LogEntry
10 //logEntry.Path: 要收集的日志文件的路径
11 // 初始化的时候起了多少个tailtask 都要记下来,为了后续判断方便
12 tailObj := NewTailTask(logEntry.Path, logEntry.Topic)
13 mk := fmt.Sprintf("%s_%s", logEntry.Path, logEntry.Topic)
14 taskMgr.taskMap[mk] = tailObj
15 }
16 go taskMgr.run()
17}
然后修改run方法,对每次配置发生变更与保存起来的taskMap进行比较
1func (t *tailLogMgr) run() {
2 for {
3 select {
4 case newConf := <-t.newConfChan:
5 for _, conf := range newConf {
6 mk := fmt.Sprintf("%s_%s", conf.Path, conf.Topic)
7 _, ok := t.taskMap[mk]
8 if ok {
9 // 原来就有,不需要操作
10 continue
11 } else {
12 // 新增的
13 tailObj := NewTailTask(conf.Path, conf.Topic)
14 t.taskMap[mk] = tailObj
15 }
16 }
17 fmt.Println("新的配置来了!", newConf)
18 default:
19 time.Sleep(time.Second)
20 }
21 }
22}
重新构建测试
1go build
2./logAgent
3init kafka success
4init etcd success.
5get conf from etcd success, [0xc0003622a0]
6index:0 value:&{./mysql.log mysql_log}
72020/03/04 16:32:16 Waiting for ./mysql.log to appear...
82020/03/04 16:32:16 Waiting for ./mysql.log to appear...
我修改etcd的key值,实现新增
1etcdctl --endpoints=http://localhost:2379 put /logagent/collect '[{"path":"./my.log","topic":"web_log"}]'
2OK
3echo "test log for web_log" >> my.log
日志如下
1Type:PUT key:/logagent/collect value:[{"path":"./my.log","topic":"web_log"}]
2 get new conf:[0xc0000bfb80]
3新的配置来了! [0xc0000bfb80]
4get log data from ./my.log success, log:test log for web_log
5pid:0 offset:0
我们检查下有没有写入到kafka中
1docker exec kafka kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic web_log --from-beginning
2test log for web_log
已经能够正常从kafka获取到数据说明推送正常。
实现删除收集任务
由于某种原因,需要将日志收集的topic变更或者不再需要收集,这个时候就需要删除之前的收集任务了。
继续修改taillog/tail_mgr.go
中的run的逻辑
1func (t *tailLogMgr) run() {
2 for {
3 select {
4 case newConf := <-t.newConfChan:
5 for _, conf := range newConf {
6 mk := fmt.Sprintf("%s_%s", conf.Path, conf.Topic)
7 _, ok := t.taskMap[mk]
8 if ok {
9 // 原来就有,不需要操作
10 continue
11 } else {
12 // 新增的
13 tailObj := NewTailTask(conf.Path, conf.Topic)
14 t.taskMap[mk] = tailObj
15 }
16 }
17 // 2. 配置删除
18 // 找出原来t.logEntry有,但是newConf中没有的,要删掉
19 for _, c1 := range t.logEntry { // 从原来的配置中依次拿出配置项
20 isDelete := true
21 for _, c2 := range newConf { // 去新的配置中逐一进行比较
22 if c2.Path == c1.Path && c2.Topic == c1.Topic {
23 isDelete = false
24 continue
25 }
26 }
27 if isDelete {
28 // 把c1对应的这个tailObj给停掉
29 mk := fmt.Sprintf("%s_%s", c1.Path, c1.Topic)
30 //t.tskMap[mk] ==> tailObj
31 t.taskMap[mk].cancelFunc()
32 }
33 }
34 fmt.Println("新的配置来了!", newConf)
35 default:
36 time.Sleep(time.Second)
37 }
38 }
39}
对taillog/tail.go
进行改造,实现退出功能
1// TailTask: 一个日志收集的任务
2type TailTask struct {
3 path string
4 topic string
5 instance *tail.Tail
6 // 为了能实现退出t.run()
7 ctx context.Context
8 cancelFunc context.CancelFunc
9}
10
11func NewTailTask(path, topic string) (tailObj *TailTask) {
12 ctx, cancel := context.WithCancel(context.Background())
13 tailObj = &TailTask{
14 path: path,
15 topic: topic,
16 ctx: ctx,
17 cancelFunc: cancel,
18 }
19 tailObj.init() // 根据路径去打开对应的日志
20 return
21}
22
23func (t *TailTask) run() {
24 for {
25 select {
26 case <-t.ctx.Done():
27 fmt.Printf("tail task:%s_%s 结束了...\n", t.path, t.topic)
28 return
29 case line := <-t.instance.Lines: // 从tailObj的通道中一行一行的读取日志数据
30 // 3.2 发往Kafka
31 //kafka.SendToKafka(t.topic, line.Text) // 函数调用函数
32 // 先把日志数据发到一个通道中
33 fmt.Printf("get log data from %s success, log:%v\n", t.path, line.Text)
34 kafka.SendToChan(t.topic, line.Text)
35 // kafka那个包中有单独的goroutine去取日志数据发到kafka
36 }
37 }
38}
构建测试删除
1go build
2./logAgent
3./logAgent
4init kafka success
5init etcd success.
6get conf from etcd success, [0xc0000b1700]
7index:0 value:&{./my.log web_log}
82020/03/04 17:54:24 Waiting for ./my.log to appear...
先新增任务
1etcdctl --endpoints=http://localhost:2379 put /logagent/collect '[{"path":"/var/log/messages","topic":"system_log"},{"path":"./my.log", "topic": "mysql_log"}]'
之前的my.log是写入web_log的,我们已经看到my.log换成mysql_log这个topic了
1Type:PUT key:/logagent/collect value:[{"path":"/var/log/messages","topic":"system_log"},{"path":"./my.log", "topic": "mysql_log"}]
2 get new conf:[0xc00020c420 0xc00020c440]
3新的配置来了! [0xc00020c420 0xc00020c440]
42020/03/04 17:56:27 Seeked ./my.log - &{Offset:0 Whence:2}
52020/03/04 17:56:27 Waiting for /var/log/messages to appear...
现在我们尝试在my.log写入数据,测试看日志写入到哪个topic中了
1date >> my.log
2date >> my.log
3date >> my.log
查看kafka
1bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic mysql_log --from-beginning
22020年 3月 4日 星期三 17时58分49秒 CST
32020年 3月 4日 星期三 17时58分50秒 CST
42020年 3月 4日 星期三 17时58分51秒 CST
查看应用程序日志,也已经换成mysql_log这个topic了
1Type:PUT key:/logagent/collect value:[{"path":"./my.log","topic":"mysql_log"}]
2 get new conf:[0xc0003b6160]
3新的配置来了! [0xc0003b6160]
4tail task:./my.log_web_log 结束了...
52020/03/04 17:57:44 Seeked ./my.log - &{Offset:0 Whence:2}
好了,任务收集这块已经正常了。
Github代码地址:logAgent
- 原文作者:黄忠德
- 原文链接:https://huangzhongde.cn/post/2020-03-04-golang_devops_logAgent_4_watch_config_from_etcd/
- 版权声明:本作品采用知识共享署名-非商业性使用-禁止演绎 4.0 国际许可协议进行许可,非商业转载请注明出处(作者,原文链接),商业转载请联系作者获得授权。