Go运维开发之日志收集(7)将日志入库到Elasticsearch并通过Kibana进行展示

Go运维开发之日志收集(1)收集应用程序日志到kafka中

Go运维开发之日志收集(2)从etcd中获取配置信息

Go运维开发之日志收集(3)根据etcd配置项创建多个tailTask

Go运维开发之日志收集(4)监视etcd配置项的变更

Go运维开发之日志收集(5)根据IP地址去拉取配置日志收集项

Go运维开发之日志收集(6)从kafka中获取日志信息

前面我们已经kafka取到日志文件了,接下来就是保存到ES中。

项目整理

我们之前开发的叫logAgent和现在的logTransfer是一起的,我把它整理到一个repo了

改成logCollect了,在这个项目下创建2个服务:logAgent和logTransfer

将日志保存到ES中

es包创建一个SendToEs的方法将数据保存到ES。

es/es.go

 1// SendToEs 获取记录存入Es中
 2func SendToEs(index string, data interface{}) {
 3	// 新增记录
 4	// p1 := User{Name: "jerry", Age: 30, Email: "hzde0128@live.cn"}
 5	put1, err := client.Index().
 6		Index(index).
 7		BodyJson(data).
 8		Do(context.Background())
 9	if err != nil {
10		// Handle error
11		panic(err)
12	}
13	fmt.Printf("Indexed %s %s to index %s, type %s\n", index, put1.Id, put1.Index, put1.Type)
14}

我们从前面的示例中看到它接收的是一组JSON串,之前是直接在终端打印,我们从那个地方开始改造。

kafka/kafka.go

 1// LogData 造一个结构体存放json数据
 2type LogData struct {
 3	Data string `json:"data"`
 4}
 5
 6// Init 初始化kafka连接,准备发送数据给es
 7func Init(addr []string, topic string) (err error) {
 8	consumer, err := sarama.NewConsumer(addr, nil)
 9	if err != nil {
10		fmt.Printf("fail to start consumer, err:%v\n", err)
11		return
12	}
13
14	partitionList, err := consumer.Partitions(topic) // 根据topic取到所有的分区
15	if err != nil {
16		fmt.Printf("fail to get list of partition:err%v\n", err)
17		return
18	}
19	fmt.Printf("topic:%s, partition:%v\n", topic, partitionList)
20	for partition := range partitionList { // 遍历所有的分区
21		// 针对每个分区创建一个对应的分区消费者
22		var cp sarama.PartitionConsumer
23		cp, err = consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest)
24		if err != nil {
25			fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err)
26			return
27		}
28		// defer cp.AsyncClose()
29		// 异步从每个分区消费信息
30		var wg sync.WaitGroup
31		defer wg.Done()
32		wg.Add(1)
33		go func(sarama.PartitionConsumer) {
34			for msg := range cp.Messages() {
35				fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v\n", msg.Partition, msg.Offset, msg.Key, string(msg.Value))
36				// 准备发送数据给es
37				ld := LogData{
38					Data: string(msg.Value),
39				}
40        // 拼装好数据发给ES
41				es.SendToEs(topic, ld) 
42			}
43		}(cp)
44		wg.Wait()
45	}
46	return
47}

构建测试

1go build
2./logTransfer
3load config success
4init es success
5topic:system_log, partition:[0]
6Partition:0 Offset:12675 Key:[] Value:Mar  5 15:55:57 huangzhgdedeAir com.apple.xpc.launchd[1] (com.oray.sunlogin.service[8024]): Service could not initialize: Unable to set current working directory. error = 2: No such file or directory, path = /Applications/SunloginClient.app/Contents/MacOS: 19D76: xpcproxy + 14893 [673][F168B004-7C3C-33AD-8EA9-EF5E7F784A63]: 0x2
7Indexed system_log 2uOxqXABInlWuVaaqUAl to index system_log, type _doc

已经看到有数据写入ES了,现在我们通过Kibana进行查看

Kibana查询

打开浏览器,输入kibana服务器的地址,我这是http://172.16.10.10:5601

dashboard

查看我们的索引列表

kibana-index-mgr

kibana-index-list

可以看到我们之前创建的索引文件都在上面

创建索引模式

kibana-index-pattern

创建完再打开discover可以看到我们的日志了,具体操作不赘述。

kibana-logs

函数调用函数优化

kafka调用es.SendToEs的时候是函数调用函数的方式,我们可以先存入一个通道中,将通道的大小放入配置文件

conf/config.ini

1[es]
2address = 172.16.10.10:9200
3max_chan_size = 100000
4nums = 16

修改结构体*conf/config.go*

1type EsConf struct {
2	Address     string `ini:"address"`
3	MaxChanSize int    `ini:"max_chan_size"`
4	Nums        int    `ini:"nums"`
5}

在es包中创建一个channel

es/es.go

 1type LogData struct {
 2	Topic string `json:"topic"`
 3	Data  string `json:"data"`
 4}
 5
 6var (
 7	client *elastic.Client
 8	ch     chan *LogData
 9)
10
11func Init(address string, maxChanSize, nums int) (err error) {
12	if !strings.HasPrefix(address, "http://") {
13		address = "http://" + address
14	}
15	client, err = elastic.NewClient(elastic.SetURL(address))
16	if err != nil {
17		// Handle error
18		return err
19	}
20	fmt.Println("connect to es success")
21	ch = make(chan *LogData, maxChanSize)
22	for i:=0;i<nums;i++{
23		go sendToEs()
24	}
25	return
26}
27
28func SendToChan(msg *LogData) {
29	ch <- msg
30}
31
32func sendToEs() {
33	// 链式操作
34	for {
35		select {
36		case msg := <-ch:
37			put1, err := client.Index().
38				Index(msg.Topic).
39				BodyJson(msg).
40				Do(context.Background())
41			if err != nil {
42				// Handle error
43				fmt.Println(err)
44				continue
45			}
46			fmt.Printf("Indexed %s %v to index %s, type %s\n", msg.Topic, put1.Id, put1.Index, put1.Type)
47		default:
48			time.Sleep(time.Second)
49		}
50	}
51}

修改kafka/kafka.go的执行写ES的入口

1        // 准备发送数据给es
2				ld := es.LogData{Topic: topic, Data:string(msg.Value)}
3				//es.SendToES(topic, ld) // 函数调用函数
4				// 优化一下: 直接放到一个chan中
5				es.SendToChan(&ld)

修改main.go函数es初始化的时候把MaxChanSize加上

main.go

1  err = es.Init(cfg.EsConf.Address, cfg.EsConf.MaxChanSize, cfg.EsConf.Nums)
2	if err != nil {
3		fmt.Printf("init ES client failed,err:%v\n", err)
4		return
5	}
6	fmt.Println("init es success.")

构建测试

删除之前的索引和记录,方便查找

1curl -X DELETE http://172.16.10.10:9200/system_log
2{"acknowledged":true}

重新构建,并观察kibana

 1go buid
 2./logTransfer
 3load config success
 4connect to es success
 5init es success.
 6topic:system_log, partition:[0]
 7Partition:0 Offset:15797 Key:[] Value:Mar  5 17:11:11 huangzhgdedeAir com.apple.xpc.launchd[1] (com.oray.sunlogin.service[14557]): Service could not initialize: Unable to set current working directory. error = 2: No such file or directory, path = /Applications/SunloginClient.app/Contents/MacOS: 19D76: xpcproxy + 14893 [673][F168B004-7C3C-33AD-8EA9-EF5E7F784A63]: 0x2
 8Partition:0 Offset:15798 Key:[] Value:Mar  5 17:11:11 huangzhgdedeAir com.apple.xpc.launchd[1] (com.oray.sunlogin.helper[14558]): Service could not initialize: Unable to set current working directory. error = 2: No such file or directory, path = /Applications/SunloginClient.app/Contents/Helpers: 19D76: xpcproxy + 14893 [673][F168B004-7C3C-33AD-8EA9-EF5E7F784A63]: 0x2
 9Partition:0 Offset:15799 Key:[] Value:Mar  5 17:11:11 huangzhgdedeAir com.apple.xpc.launchd[1] (com.oray.sunlogin.service[14557]): Service exited with abnormal code: 78
10Partition:0 Offset:15800 Key:[] Value:Mar  5 17:11:11 huangzhgdedeAir com.apple.xpc.launchd[1] (com.oray.sunlogin.service): Service only ran for 0 seconds. Pushing respawn out by 10 seconds.
11Partition:0 Offset:15801 Key:[] Value:Mar  5 17:11:11 huangzhgdedeAir com.apple.xpc.launchd[1] (com.oray.sunlogin.helper[14558]): Service exited with abnormal code: 78
12Partition:0 Offset:15802 Key:[] Value:Mar  5 17:11:11 huangzhgdedeAir com.apple.xpc.launchd[1] (com.oray.sunlogin.helper): Service only ran for 0 seconds. Pushing respawn out by 10 seconds.
13Indexed system_log h-P2qXABInlWuVaajUdd to index system_log, type _doc
14Indexed system_log iOP2qXABInlWuVaajUdd to index system_log, type _doc
15Indexed system_log huP2qXABInlWuVaajUdd to index system_log, type _doc
16Indexed system_log heP2qXABInlWuVaajUdb to index system_log, type _doc
17Indexed system_log ieP2qXABInlWuVaajUe9 to index system_log, type _doc
18Indexed system_log hOP2qXABInlWuVaajUda to index system_log, type _doc

看日志是发送成功了,登录kibana进行查看

kibana-opit

已经能看到最新的数据了。

后期开发计划:

  • topic通过etcd来进行监视,实现灵活

  • ES索引按天划分,防止日志量过大导致日志查询慢

  • 日志优化…

Github代码地址:logCollect