Go运维开发之日志收集(1)收集应用程序日志到kafka中
Go运维开发之日志收集(1)收集应用程序日志到kafka中
需求背景
每个业务系统都有自己的⽇志,当业务系统出现问题时,需要通过查找⽇志信息来定位和解决问题。 当业务系统服务器⽐较少时,登陆到服务器上查看即可满⾜。但当系统机器规模巨⼤,登陆到服务器上查看⼏乎不现实(分布式的系统,⼀个系统部署在⼗⼏甚至几十台服务器上)
解决方案
把服务器上的⽇志实时收集,统⼀存储到中⼼系统。 再对这些⽇志建⽴索引,通过搜索即可快速找到对应的⽇志记录。 通过提供⼀个界⾯友好的web⻚⾯实现⽇志展示与检索。
业界方案
有早期的ELK到现在的EFK。ELK在每台服务器上部署logstash
,比较重量级,所以演化成客户端部署filebeat
的EFK,由filebeat
收集向logstash
中写数据,最后落地到elasticsearch
,通过kibana
界面进行日志检索。
优缺点
优点:现成的解决方案,直接拿过来用,能够实现日志收集与检索。
缺点:
- 运维成本⾼,每增加⼀个⽇志收集项,都需要⼿动修改配置。
- 监控缺失,⽆法准确获取logstash的状态。⽆法做到定制化开发与维护。
- ⽆法做到定制化开发与维护。
准备开发环境
为了快速开发,这边提供一个docker-compose.yaml
文件,直接启动。
docker-compose.yaml
内容如下:
1version: "3"
2
3networks:
4 app-kafka:
5 driver: bridge
6
7services:
8 zookeeper:
9 container_name: zookeeper
10 image: zookeeper:3.4.14
11 restart: always
12 networks:
13 - app-kafka
14 kafka:
15 container_name: kafka
16 image: bitnami/kafka:2.4.0
17 restart: always
18 environment:
19 - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
20 - ALLOW_PLAINTEXT_LISTENER=yes
21 # 后面三条是暴露给外网使用
22 - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
23 - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,PLAINTEXT_HOST://:29092
24 - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
25 ports:
26 - 127.0.0.1:9092:9092
27 # 外网使用29092进行访问
28 - 127.0.0.1:29092:29092
29 networks:
30 - app-kafka
启动kafka
1docker-compose up -d
kafka依赖zookeeper的,所以有2个镜像,kafka环境变量里面指定了zookeeper的地址。
注意
kafka对外提供服务的时候,需要配置 KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP
, KAFKA_CFG_LISTENERS
, KAFKA_CFG_ADVERTISED_LISTENERS
三个环境变量,然后增加一个对外提供服务的端口,不然会提示no such host
错误
kafka库的基本操作
这里使用sarama
对kafka
进行操作
gitHub.com/Shopify/sarama
新建一个kafka_demo
的目录,创建一个main.go
文件
1mkdir <GOPATH>/src/kafka_demo
2cd kafka_demo
3go mod init
4ls
5go.mod main.go
main.go
内容如下:
1package main
2
3import (
4 "fmt"
5 "github.com/Shopify/sarama"
6)
7
8// 基于sarama第三⽅库开发的kafka client
9func main() {
10 config := sarama.NewConfig()
11
12 config.Producer.RequiredAcks = sarama.WaitForAll // 发送完数据需要leader和follow都确认
13 config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出⼀个partition
14 config.Producer.Return.Successes = true // 成功交付的消息将在success channel返回
15
16 // 构造⼀个消息
17 msg := &sarama.ProducerMessage{}
18 msg.Topic = "web_log"
19 msg.Value = sarama.StringEncoder("this is a test log")
20
21 // 连接kafka
22 client, err := sarama.NewSyncProducer([]string{"127.0.0.1:29092"}, config)
23 if err != nil {
24 fmt.Println("producer closed, err:", err)
25 return
26 }
27
28 defer client.Close()
29 // 发送消息
30 pid, offset, err := client.SendMessage(msg)
31 if err != nil {
32 fmt.Printf("send msg failed, err: %v\n", err)
33 return
34 }
35 fmt.Printf("pid:%v offset:%v\n", pid, offset)
36}
测试能否正常发送到kafka
1go run main.go
2pid:0 offset:0
我们连上kakfa看一下
1docker exec kafka kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic web_log --from-beginning
2this is a test log
好,能正常接收到消息
tail库的基本操作
这里使用第三方库github.com/hpcloud/tail
新建一个tail_demo
的目录
1mkdir <GOPATH>/src/tail_demo
2cd tail_demo
3go mod init
4ls
5go.mod main.go
main.go
内容如下:
1package main
2
3import (
4 "fmt"
5 "github.com/hpcloud/tail"
6 "time"
7)
8
9// tail demo
10func main() {
11 fileName := "./my.log"
12 config := tail.Config{
13 ReOpen: true,
14 Follow: true,
15 Location: &tail.SeekInfo{Offset: 0, Whence: 2},
16 MustExist: false,
17 Poll: true}
18 tails, err := tail.TailFile(fileName, config)
19 if err != nil {
20 fmt.Println("tail file failed, err:", err)
21 return
22 }
23 var (
24 msg *tail.Line
25 ok bool
26 )
27 for {
28 msg, ok = <-tails.Lines
29 if !ok {
30 fmt.Printf("tail file close reopen, filename:%s\n", tails.Filename)
31 time.Sleep(time.Second)
32 continue
33 }
34 fmt.Println("msg:", msg.Text)
35 }
36}
运行main.go
1go run main.go
22020/03/03 11:10:01 Waiting for ./my.log to appear...
新建一个my.log
文件,向文件中写入文件
1touch my.log
2echo "this is a test log." >> my.log
程序日志:
12020/03/03 11:14:28 Seeked ./my.log - &{Offset:0 Whence:2}
2msg: this is a test log.
说明tail库运行正常。
logAgent_0.0.1(业务实现)
现在使用tail库能读取到日志,使用sarama
库能到推送消息到kafka,我们结合这两个库,实现一边读取文件日志,一遍写入到kafka。
目录结构
1mkdir -p logAgent/{kafka,taillog}
2ls logAgent
3kafka taillog
4cd logAgent
5go mod init
新建kafka/kafka.go
和taillog/tail.go
,分别先建立一个初始化函数
kafka/kafka.go
1package kafka
2
3import (
4 "fmt"
5
6 "github.com/Shopify/sarama"
7)
8
9var (
10 client sarama.SyncProducer
11)
12
13func Init(addrs []string) (err error) {
14 config := sarama.NewConfig()
15
16 config.Producer.RequiredAcks = sarama.WaitForAll // 发送完数据需要leader和follow都确认
17 config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出⼀个partition
18 config.Producer.Return.Successes = true // 成功交付的消息将在success channel返回
19
20 // 连接kafka
21 client, err = sarama.NewSyncProducer([]string{addrs}, config)
22 if err != nil {
23 fmt.Println("producer closed, err:", err)
24 return
25 }
26 return
27}
taillog/tail.go
1package taillog
2
3import (
4 "fmt"
5
6 "github.com/hpcloud/tail"
7)
8
9var (
10 tailObj *tail.Tail
11)
12
13func Init(filename string) (err error) {
14 config := tail.Config{
15 ReOpen: true,
16 Follow: true,
17 Location: &tail.SeekInfo{Offset: 0, Whence: 2},
18 MustExist: false,
19 Poll: true}
20 tailObj, err = tail.TailFile(filename, config)
21 if err != nil {
22 fmt.Println("tail file failed, err:", err)
23 return
24 }
25 return
26}
main.go
内容如下
1package main
2
3import (
4 "fmt"
5 "logAgent/kafka"
6 "logAgent/taillog"
7)
8
9func main() {
10 // 1.初始化kafka
11 err := kafka.Init([]string{"127.0.0.1:29092"})
12 if err != nil {
13 fmt.Printf("init kafka failed ,err:%v\n", err)
14 return
15 }
16 fmt.Println("init kafka success")
17 // 2.初始化taillog
18 err = taillog.Init("./my.log")
19 if err != nil {
20 fmt.Printf("init taillog failed, err:%v\n", err)
21 return
22 }
23 fmt.Println("init taillog success")
24}
都初始化之后,就是怎么将日志发给kafka了。
在taillog/tail.go
中创建一个ReadChan
函数
1func ReadChan() <-chan *tail.Line {
2 return tailObj.Lines
3}
在kafka/kafka.go
中创建一个SendToKafka
的函数,该函数接收从外部提供的topic和data参数
1func SendToKafka(topic, data string) {
2 // 构造⼀个消息
3 msg := &sarama.ProducerMessage{}
4 msg.Topic = topic
5 msg.Value = sarama.StringEncoder(data)
6
7 // 发送消息
8 pid, offset, err := client.SendMessage(msg)
9 if err != nil {
10 fmt.Printf("send msg failed, err: %v\n", err)
11 return
12 }
13 fmt.Printf("pid:%v offset:%v\n", pid, offset)
14}
在main.go
中创建run
函数,执行具体的任务,并在main
函数中调用它
1func run() {
2 for {
3 select {
4 case line := <-taillog.ReadChan():
5 kafka.SendToKafka("web_log", line.Text)
6 default:
7 time.Sleep(time.Millisecond * 500)
8 }
9 }
10}
构建测试一下
1go build
2./logAgent
3init kafka success
4init taillog success
52020/03/03 12:02:09 Waiting for ./my.log to appear...
往my.log中写入一点数据进行测试。
logAgent_0.0.2(引入配置文件)
之前的kafka地址,topic以及日志文件都是写死在应用程序中,不具备移植性,我们通过配置文件将其变为可配置的。
解析配置文件的库有很多,格式也有很多:ini,toml,yaml等,这里用到无闻大神的ini库gopkg.in/ini.v1
新建一个conf
目录,并建立config.ini
配置文件
conf/config.ini
文件内容如下:
1[kafka]
2address = 127.0.0.1:29092
3topic = web_log
4
5[tail]
6filename = my.log
新建一个结构体文件config.go
,用于映射这些字段
conf/config.go
内容如下:
1package conf
2
3type AppConf struct{
4 KafkaConf `ini:"kafka"`
5 TailConf `ini:"tail"`
6}
7
8type KafkaConf struct{
9 Address string `ini:"address"`
10 Topic string `ini:"topic"`
11}
12
13type TailConf struct{
14 Filename string `ini:"filename"`
15}
接下来就是怎么映射这些配置信息到ini配置文件
修改main.go
添加一个变量cfg
并使用ini.MapTo
方法
1var (
2 cfg = new(conf.AppConf)
3)
4
5// 在main函数开始的时候加载配置文件
6// 0.加载配置文件
7 err := ini.MapTo(cfg, "conf/config.ini")
8 if err != nil {
9 fmt.Printf("load conf failed, err:%v\n", err)
10 return
11 }
然后,将之前写死的地方用cfg
下的结构体代替
附完整的代码
main.go
1package main
2
3import (
4 "fmt"
5 "logAgent/conf"
6 "logAgent/kafka"
7 "logAgent/taillog"
8 "time"
9
10 "gopkg.in/ini.v1"
11)
12
13var (
14 cfg = new(conf.AppConf)
15)
16
17func run() {
18 for {
19 select {
20 case line := <-taillog.ReadChan():
21 kafka.SendToKafka(cfg.KafkaConf.Topic, line.Text)
22 default:
23 time.Sleep(time.Millisecond * 500)
24 }
25 }
26}
27
28func main() {
29 // 0.加载配置文件
30 err := ini.MapTo(cfg, "conf/config.ini")
31 if err != nil {
32 fmt.Printf("load conf failed, err:%v\n", err)
33 return
34 }
35 // 1.初始化kafka
36 err = kafka.Init([]string{cfg.KafkaConf.Address})
37 if err != nil {
38 fmt.Printf("init kafka failed ,err:%v\n", err)
39 return
40 }
41 fmt.Println("init kafka success")
42 // 2.初始化taillog
43 err = taillog.Init(cfg.TailConf.Filename)
44 if err != nil {
45 fmt.Printf("init taillog failed, err:%v\n", err)
46 return
47 }
48 fmt.Println("init taillog success")
49
50 // 3.执行任务
51 run()
52}
kafka/kafka.go
1package kafka
2
3import (
4 "fmt"
5
6 "github.com/Shopify/sarama"
7)
8
9var (
10 client sarama.SyncProducer
11)
12
13func Init(addrs []string) (err error) {
14 config := sarama.NewConfig()
15
16 config.Producer.RequiredAcks = sarama.WaitForAll // 发送完数据需要leader和follow都确认
17 config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出⼀个partition
18 config.Producer.Return.Successes = true // 成功交付的消息将在success channel返回
19
20 // 连接kafka
21 client, err = sarama.NewSyncProducer(addrs, config)
22 if err != nil {
23 fmt.Println("producer closed, err:", err)
24 return
25 }
26 return
27}
28
29func SendToKafka(topic, data string) {
30 // 构造⼀个消息
31 msg := &sarama.ProducerMessage{}
32 msg.Topic = topic
33 msg.Value = sarama.StringEncoder(data)
34
35 // 发送消息
36 pid, offset, err := client.SendMessage(msg)
37 if err != nil {
38 fmt.Printf("send msg failed, err: %v\n", err)
39 return
40 }
41 fmt.Printf("pid:%v offset:%v\n", pid, offset)
42}
tailog/tail.go
1package taillog
2
3import (
4 "fmt"
5
6 "github.com/hpcloud/tail"
7)
8
9var (
10 tailObj *tail.Tail
11)
12
13func Init(filename string) (err error) {
14 config := tail.Config{
15 ReOpen: true,
16 Follow: true,
17 Location: &tail.SeekInfo{Offset: 0, Whence: 2},
18 MustExist: false,
19 Poll: true}
20 tailObj, err = tail.TailFile(filename, config)
21 if err != nil {
22 fmt.Println("tail file failed, err:", err)
23 return
24 }
25 return
26}
27
28func ReadChan() <-chan *tail.Line {
29 return tailObj.Lines
30}
conf/config.go
1package conf
2
3type AppConf struct{
4 KafkaConf `ini:"kafka"`
5 TailConf `ini:"tail"`
6}
7
8type KafkaConf struct{
9 Address string `ini:"address"`
10 Topic string `ini:"topic"`
11}
12
13type TailConf struct{
14 Filename string `ini:"filename"`
15}
conf/config.ini
1[kafka]
2address = 127.0.0.1:29092
3topic = web_log
4
5[tail]
6filename = my.log
编译成二进制文件
1go build
跨平台编译
1GOOS=linux GOARCH=amd64 go build
编译完成之后,只需要logAgent
和conf/config.ini
两个文件即可运行
Github地址:logAgent
- 原文作者:黄忠德
- 原文链接:https://huangzhongde.cn/post/2020-03-03-golang_devops_logAgent_1_write_log_to_kafka/
- 版权声明:本作品采用知识共享署名-非商业性使用-禁止演绎 4.0 国际许可协议进行许可,非商业转载请注明出处(作者,原文链接),商业转载请联系作者获得授权。