Go运维开发之日志收集(1)收集应用程序日志到kafka中

需求背景

每个业务系统都有自己的⽇志,当业务系统出现问题时,需要通过查找⽇志信息来定位和解决问题。 当业务系统服务器⽐较少时,登陆到服务器上查看即可满⾜。但当系统机器规模巨⼤,登陆到服务器上查看⼏乎不现实(分布式的系统,⼀个系统部署在⼗⼏甚至几十台服务器上)

解决方案

把服务器上的⽇志实时收集,统⼀存储到中⼼系统。 再对这些⽇志建⽴索引,通过搜索即可快速找到对应的⽇志记录。 通过提供⼀个界⾯友好的web⻚⾯实现⽇志展示与检索。

业界方案

有早期的ELK到现在的EFK。ELK在每台服务器上部署logstash,比较重量级,所以演化成客户端部署filebeat的EFK,由filebeat收集向logstash中写数据,最后落地到elasticsearch,通过kibana界面进行日志检索。

优缺点

优点:现成的解决方案,直接拿过来用,能够实现日志收集与检索。

缺点:

  • 运维成本⾼,每增加⼀个⽇志收集项,都需要⼿动修改配置。
  • 监控缺失,⽆法准确获取logstash的状态。⽆法做到定制化开发与维护。
  • ⽆法做到定制化开发与维护。

准备开发环境

为了快速开发,这边提供一个docker-compose.yaml文件,直接启动。

docker-compose.yaml内容如下:

 1version: "3"
 2
 3networks:
 4  app-kafka:
 5    driver: bridge
 6
 7services:
 8  zookeeper:
 9    container_name: zookeeper
10    image: zookeeper:3.4.14
11    restart: always
12    networks:
13      - app-kafka
14  kafka:
15    container_name: kafka
16    image: bitnami/kafka:2.4.0
17    restart: always
18    environment: 
19      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
20      - ALLOW_PLAINTEXT_LISTENER=yes
21      # 后面三条是暴露给外网使用
22      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
23      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,PLAINTEXT_HOST://:29092
24      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
25    ports:
26    - 127.0.0.1:9092:9092
27      # 外网使用29092进行访问
28    - 127.0.0.1:29092:29092
29    networks:
30      - app-kafka

启动kafka

1docker-compose up -d

kafka依赖zookeeper的,所以有2个镜像,kafka环境变量里面指定了zookeeper的地址。


注意

kafka对外提供服务的时候,需要配置 KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP , KAFKA_CFG_LISTENERS , KAFKA_CFG_ADVERTISED_LISTENERS 三个环境变量,然后增加一个对外提供服务的端口,不然会提示no such host错误


kafka库的基本操作

这里使用saramakafka进行操作

gitHub.com/Shopify/sarama

新建一个kafka_demo的目录,创建一个main.go文件

1mkdir <GOPATH>/src/kafka_demo
2cd kafka_demo
3go mod init
4ls
5go.mod main.go

main.go内容如下:

 1package main
 2
 3import (
 4	"fmt"
 5	"github.com/Shopify/sarama"
 6)
 7
 8// 基于sarama第三⽅库开发的kafka client
 9func main() {
10	config := sarama.NewConfig()
11
12	config.Producer.RequiredAcks = sarama.WaitForAll          // 发送完数据需要leader和follow都确认
13	config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出⼀个partition
14	config.Producer.Return.Successes = true                   // 成功交付的消息将在success channel返回
15  
16	// 构造⼀个消息
17	msg := &sarama.ProducerMessage{}
18	msg.Topic = "web_log"
19	msg.Value = sarama.StringEncoder("this is a test log")
20  
21	// 连接kafka
22	client, err := sarama.NewSyncProducer([]string{"127.0.0.1:29092"}, config)
23	if err != nil {
24		fmt.Println("producer closed, err:", err)
25		return
26	}
27  
28	defer client.Close()
29	// 发送消息
30	pid, offset, err := client.SendMessage(msg)
31	if err != nil {
32		fmt.Printf("send msg failed, err: %v\n", err)
33		return
34	}
35	fmt.Printf("pid:%v offset:%v\n", pid, offset)
36}

测试能否正常发送到kafka

1go run main.go
2pid:0 offset:0

我们连上kakfa看一下

1docker exec kafka kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic web_log --from-beginning
2this is a test log

好,能正常接收到消息

tail库的基本操作

这里使用第三方库github.com/hpcloud/tail

新建一个tail_demo的目录

1mkdir <GOPATH>/src/tail_demo
2cd tail_demo
3go mod init
4ls
5go.mod main.go

main.go内容如下:

 1package main
 2
 3import (
 4	"fmt"
 5	"github.com/hpcloud/tail"
 6	"time"
 7)
 8
 9// tail demo
10func main() {
11	fileName := "./my.log"
12	config := tail.Config{
13		ReOpen:    true,
14		Follow:    true,
15		Location:  &tail.SeekInfo{Offset: 0, Whence: 2},
16		MustExist: false,
17		Poll:      true}
18	tails, err := tail.TailFile(fileName, config)
19	if err != nil {
20		fmt.Println("tail file failed, err:", err)
21		return
22	}
23	var (
24		msg *tail.Line
25		ok  bool
26	)
27	for {
28		msg, ok = <-tails.Lines
29		if !ok {
30			fmt.Printf("tail file close reopen, filename:%s\n", tails.Filename)
31			time.Sleep(time.Second)
32			continue
33		}
34		fmt.Println("msg:", msg.Text)
35	}
36}

运行main.go

1go run main.go
22020/03/03 11:10:01 Waiting for ./my.log to appear...

新建一个my.log文件,向文件中写入文件

1touch my.log
2echo "this is a test log." >> my.log

程序日志:

12020/03/03 11:14:28 Seeked ./my.log - &{Offset:0 Whence:2}
2msg: this is a test log.

说明tail库运行正常。

logAgent_0.0.1(业务实现)

现在使用tail库能读取到日志,使用sarama库能到推送消息到kafka,我们结合这两个库,实现一边读取文件日志,一遍写入到kafka。

目录结构

1mkdir -p logAgent/{kafka,taillog} 
2ls logAgent
3kafka   taillog
4cd logAgent
5go mod init

新建kafka/kafka.gotaillog/tail.go,分别先建立一个初始化函数

kafka/kafka.go

 1package kafka
 2
 3import (
 4	"fmt"
 5
 6	"github.com/Shopify/sarama"
 7)
 8
 9var (
10    client sarama.SyncProducer
11)
12
13func Init(addrs []string) (err error) {
14	config := sarama.NewConfig()
15
16	config.Producer.RequiredAcks = sarama.WaitForAll          // 发送完数据需要leader和follow都确认
17	config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出⼀个partition
18	config.Producer.Return.Successes = true                   // 成功交付的消息将在success channel返回
19
20	// 连接kafka
21	client, err = sarama.NewSyncProducer([]string{addrs}, config)
22	if err != nil {
23		fmt.Println("producer closed, err:", err)
24		return
25	}
26	return
27}

taillog/tail.go

 1package taillog
 2
 3import (
 4	"fmt"
 5
 6	"github.com/hpcloud/tail"
 7)
 8
 9var (
10	tailObj *tail.Tail
11)
12
13func Init(filename string) (err error) {
14	config := tail.Config{
15		ReOpen:    true,
16		Follow:    true,
17		Location:  &tail.SeekInfo{Offset: 0, Whence: 2},
18		MustExist: false,
19		Poll:      true}
20	tailObj, err = tail.TailFile(filename, config)
21	if err != nil {
22		fmt.Println("tail file failed, err:", err)
23		return
24	}
25	return
26}

main.go内容如下

 1package main
 2
 3import (
 4	"fmt"
 5	"logAgent/kafka"
 6	"logAgent/taillog"
 7)
 8
 9func main() {
10	// 1.初始化kafka
11	err := kafka.Init([]string{"127.0.0.1:29092"})
12	if err != nil {
13		fmt.Printf("init kafka failed ,err:%v\n", err)
14		return
15	}
16	fmt.Println("init kafka success")
17	// 2.初始化taillog
18	err = taillog.Init("./my.log")
19	if err != nil {
20		fmt.Printf("init taillog failed, err:%v\n", err)
21		return
22	}
23	fmt.Println("init taillog success")
24}

都初始化之后,就是怎么将日志发给kafka了。

taillog/tail.go中创建一个ReadChan函数

1func ReadChan() <-chan *tail.Line {
2	return tailObj.Lines
3}

kafka/kafka.go中创建一个SendToKafka的函数,该函数接收从外部提供的topic和data参数

 1func SendToKafka(topic, data string) {
 2	// 构造⼀个消息
 3	msg := &sarama.ProducerMessage{}
 4	msg.Topic = topic
 5	msg.Value = sarama.StringEncoder(data)
 6
 7	// 发送消息
 8	pid, offset, err := client.SendMessage(msg)
 9	if err != nil {
10		fmt.Printf("send msg failed, err: %v\n", err)
11		return
12	}
13	fmt.Printf("pid:%v offset:%v\n", pid, offset)
14}

main.go中创建run函数,执行具体的任务,并在main函数中调用它

 1func run() {
 2	for {
 3		select {
 4		case line := <-taillog.ReadChan():
 5			kafka.SendToKafka("web_log", line.Text)
 6		default:
 7			time.Sleep(time.Millisecond * 500)
 8		}
 9	}
10}

构建测试一下

1go build
2./logAgent
3init kafka success
4init taillog success
52020/03/03 12:02:09 Waiting for ./my.log to appear...

往my.log中写入一点数据进行测试。

logAgent_0.0.2(引入配置文件)

之前的kafka地址,topic以及日志文件都是写死在应用程序中,不具备移植性,我们通过配置文件将其变为可配置的。

解析配置文件的库有很多,格式也有很多:ini,toml,yaml等,这里用到无闻大神的ini库gopkg.in/ini.v1

新建一个conf目录,并建立config.ini配置文件

conf/config.ini文件内容如下:

1[kafka]
2address = 127.0.0.1:29092
3topic = web_log
4
5[tail]
6filename = my.log

新建一个结构体文件config.go,用于映射这些字段

conf/config.go内容如下:

 1package conf
 2
 3type AppConf struct{
 4	KafkaConf `ini:"kafka"`
 5	TailConf `ini:"tail"`
 6}
 7
 8type KafkaConf struct{
 9	Address string `ini:"address"`
10	Topic string `ini:"topic"`
11}
12
13type TailConf struct{
14	Filename string `ini:"filename"`
15}

接下来就是怎么映射这些配置信息到ini配置文件

修改main.go添加一个变量cfg并使用ini.MapTo方法

 1var (
 2	cfg = new(conf.AppConf)
 3)
 4
 5// 在main函数开始的时候加载配置文件
 6// 0.加载配置文件
 7	err := ini.MapTo(cfg, "conf/config.ini")
 8	if err != nil {
 9		fmt.Printf("load conf failed, err:%v\n", err)
10		return
11	}

然后,将之前写死的地方用cfg下的结构体代替

附完整的代码

main.go

 1package main
 2
 3import (
 4	"fmt"
 5	"logAgent/conf"
 6	"logAgent/kafka"
 7	"logAgent/taillog"
 8	"time"
 9
10	"gopkg.in/ini.v1"
11)
12
13var (
14	cfg = new(conf.AppConf)
15)
16
17func run() {
18	for {
19		select {
20		case line := <-taillog.ReadChan():
21			kafka.SendToKafka(cfg.KafkaConf.Topic, line.Text)
22		default:
23			time.Sleep(time.Millisecond * 500)
24		}
25	}
26}
27
28func main() {
29	// 0.加载配置文件
30	err := ini.MapTo(cfg, "conf/config.ini")
31	if err != nil {
32		fmt.Printf("load conf failed, err:%v\n", err)
33		return
34	}
35	// 1.初始化kafka
36	err = kafka.Init([]string{cfg.KafkaConf.Address})
37	if err != nil {
38		fmt.Printf("init kafka failed ,err:%v\n", err)
39		return
40	}
41	fmt.Println("init kafka success")
42	// 2.初始化taillog
43	err = taillog.Init(cfg.TailConf.Filename)
44	if err != nil {
45		fmt.Printf("init taillog failed, err:%v\n", err)
46		return
47	}
48	fmt.Println("init taillog success")
49
50	// 3.执行任务
51	run()
52}

kafka/kafka.go

 1package kafka
 2
 3import (
 4	"fmt"
 5
 6	"github.com/Shopify/sarama"
 7)
 8
 9var (
10	client sarama.SyncProducer
11)
12
13func Init(addrs []string) (err error) {
14	config := sarama.NewConfig()
15
16	config.Producer.RequiredAcks = sarama.WaitForAll          // 发送完数据需要leader和follow都确认
17	config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出⼀个partition
18	config.Producer.Return.Successes = true                   // 成功交付的消息将在success channel返回
19
20	// 连接kafka
21	client, err = sarama.NewSyncProducer(addrs, config)
22	if err != nil {
23		fmt.Println("producer closed, err:", err)
24		return
25	}
26	return
27}
28
29func SendToKafka(topic, data string) {
30	// 构造⼀个消息
31	msg := &sarama.ProducerMessage{}
32	msg.Topic = topic
33	msg.Value = sarama.StringEncoder(data)
34
35	// 发送消息
36	pid, offset, err := client.SendMessage(msg)
37	if err != nil {
38		fmt.Printf("send msg failed, err: %v\n", err)
39		return
40	}
41	fmt.Printf("pid:%v offset:%v\n", pid, offset)
42}

tailog/tail.go

 1package taillog
 2
 3import (
 4	"fmt"
 5
 6	"github.com/hpcloud/tail"
 7)
 8
 9var (
10	tailObj *tail.Tail
11)
12
13func Init(filename string) (err error) {
14	config := tail.Config{
15		ReOpen:    true,
16		Follow:    true,
17		Location:  &tail.SeekInfo{Offset: 0, Whence: 2},
18		MustExist: false,
19		Poll:      true}
20	tailObj, err = tail.TailFile(filename, config)
21	if err != nil {
22		fmt.Println("tail file failed, err:", err)
23		return
24	}
25	return
26}
27
28func ReadChan() <-chan *tail.Line {
29	return tailObj.Lines
30}

conf/config.go

 1package conf
 2
 3type AppConf struct{
 4	KafkaConf `ini:"kafka"`
 5	TailConf `ini:"tail"`
 6}
 7
 8type KafkaConf struct{
 9	Address string `ini:"address"`
10	Topic string `ini:"topic"`
11}
12
13type TailConf struct{
14	Filename string `ini:"filename"`
15}

conf/config.ini

1[kafka]
2address = 127.0.0.1:29092
3topic = web_log
4
5[tail]
6filename = my.log

编译成二进制文件

1go build

跨平台编译

1GOOS=linux GOARCH=amd64 go build

编译完成之后,只需要logAgentconf/config.ini两个文件即可运行

Github地址:logAgent