Go运维开发之日志收集(2)从etcd中获取配置信息

Go运维开发之日志收集(1)收集应用程序日志到kafka中

在上一篇中我们已经实现了单个日志收集后发送到kafka中

我们想一下下面的几个问题:

  • 怎么样实现多个日志收集?
  • 每收集一个日志都运行一个logAgent?
  • 每次收集日志都需要手动去服务器配置?
  • 有没有办法从配置服务器获取?

带着这些疑问,这里我们继续开发我们的日志收集服务,在这一节中,我们引入etcd服务,通过etcd进行配置的增删改实现日志的收集。

  1. 实现日志收集项从etcd获取

环境准备

这一节我们需要用到etcd服务器,可以从官网的Github下载http://github.com/etcd-io/etcd/releases

下载之后直接二进制运行

1etcd --logger=zap

默认监听在2379(client)和2380(集群通信)端口上

或者还是使用docker-compose.yaml文件

 1version: "3"
 2
 3networks:
 4  app-kafka:
 5    driver: bridge
 6
 7services:
 8  zookeeper:
 9    container_name: zookeeper
10    image: zookeeper:3.4.14
11    restart: always
12    networks:
13      - app-kafka
14  kafka:
15    container_name: kafka
16    image: bitnami/kafka:2.4.0
17    restart: always
18    # 后面三条是暴露给外网使用
19    environment: 
20      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
21      - ALLOW_PLAINTEXT_LISTENER=yes
22      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
23      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,PLAINTEXT_HOST://:29092
24      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
25    ports:
26    - 127.0.0.1:9092:9092
27    - 127.0.0.1:29092:29092
28    networks:
29      - app-kafka
30  etcd:
31    container_name: etcd
32    image: bitnami/etcd:3
33    restart: always
34    environment:
35      - ALLOW_NONE_AUTHENTICATION=yes
36    ports: 
37    - 127.0.0.1:2379:2379
38    networks: 
39      - app-kafka

改完之后直接运行

1docker-compose up -d etcd # 运行单个服务,如果之前的没有停的话
2docker-compose up -d      # 运行yaml文件里面所有的服务

在开始之前我们先熟悉一下etcd的基础操作。

etcd基础操作

Put/Get操作

这里我们使用官方的库go.etcd.io/etcd/clientv3

新建一个目录etcd_demo,新建etc.go文件

 1package main
 2
 3import (
 4	"context"
 5	"fmt"
 6	"time"
 7
 8	"go.etcd.io/etcd/clientv3"
 9)
10
11func main() {
12	cli, err := clientv3.New(clientv3.Config{
13		Endpoints:   []string{"127.0.0.1:2379"},
14		DialTimeout: 5 * time.Second})
15	if err != nil {
16		fmt.Printf("init etcd failed, err: %v\n", err)
17		return
18	}
19	fmt.Println("init etcd success.")
20
21	defer cli.Close()
22	// put操作
23	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
24	_, err = cli.Put(ctx, "/name", "jerry")
25	cancel()
26	if err != nil {
27		fmt.Printf("put to etcd failed, err:%v\n", err)
28		return
29	}
30	fmt.Println("put to etcd success.")
31
32	// get操作
33	ctx, cancel = context.WithTimeout(context.Background(), time.Second)
34	resp, err := cli.Get(ctx, "/name")
35	cancel()
36
37	if err != nil {
38		fmt.Println("get from etcd failed, err:", err)
39		return
40	}
41	for _, v := range resp.Kvs {
42		fmt.Printf("Key:%s,Value:%s\n", v.Key, v.Value)
43	}

然后我们尝试构建一下

1go mod init
2go build

过程中出现错误,原因是grpc版本过高导致的,解决办法etcd undefined: resolver.BuildOption

正常运行之后,我们看到下面的结果

1init etcd success.
2put to etcd success.
3Key:name,Value:jerry

Watch操作

创建一个etcd_watch的目录,在该目录下创建etcd_watch.go文件

 1package main
 2
 3import (
 4	"context"
 5	"fmt"
 6	"time"
 7
 8	"go.etcd.io/etcd/clientv3"
 9)
10
11func main() {
12	cli, err := clientv3.New(clientv3.Config{
13		Endpoints:   []string{"127.0.0.1:2379"},
14		DialTimeout: time.Second,
15	})
16	if err != nil {
17		fmt.Printf("connect to etcd failed, err: %v\n", err)
18		return
19	}
20	fmt.Println("connect etcd success.")
21
22	defer cli.Close()
23
24	// Watch操作
25	wch := cli.Watch(context.Background(), "/name")
26	for resp := range wch {
27		for _, ev := range resp.Events {
28			fmt.Printf("Type: %v, Key:%v, Value:%v\n", ev.Type, string(ev.Kv.Key), string(ev.Kv.Value))
29		}
30	}
31}

构建运行,然后尝试通过etcdctl向etcd指定的key/name发送数据测试

1etcdctl --endpoints=http://localhost:2379 put /name "abc"
2etcdctl --endpoints=http://localhost:2379 put /name "jerry"

etcd_watch的运行结果

1./etcd_watch
2connect etcd success.
3Type: PUT, Key:/name, Value:abc
4Type: PUT, Key:/name, Value:jerry

客户端操作

1# 新增/修改
2etcdctl --endpoints=http://localhost:2379 put /name "abc"
3# 获取
4etcdctl --endpoints=http://localhost:2379 get /name 
5# 删除
6etcdctl --endpoints=http://localhost:2379 del /name 

logAgent_v0.1.1(引入etcd)

修改配置文件,将之前的topic和日志文件的路径去掉,添加etcd服务器地址,准备从etcd获取topic和日志文件的路径

conf/config.ini

1[kafka]
2address = 127.0.0.1:29092
3
4[etcd]
5address = 127.0.0.1:2379
6timeout = 5
7collect_log_key = /logagent/collect # 收集的日志的key

修改对应的结构体conf/config.go

 1package conf
 2
 3type AppConf struct {
 4	KafkaConf `ini:"kafka"`
 5	EtcdConf  `ini:"etcd"`
 6}
 7
 8type KafkaConf struct {
 9	Address string `ini:"address"`
10}
11
12type EtcdConf struct {
13	Address string `ini:"address"`
14	Timeout int    `ini:"timeout"`
15	Key     string `ini:"collect_log_key"`
16}

在logAgent项目中创建etcd目录,并创建etcd.go文件

etcd/etcd.go内容如下:

 1package etcd
 2
 3import (
 4	"fmt"
 5	"time"
 6
 7	"go.etcd.io/etcd/clientv3"
 8)
 9
10var (
11	client *clientv3.Client
12)
13
14func Init(addrs string, timeout time.Duration) (err error) {
15	client, err = clientv3.New(clientv3.Config{
16		Endpoints:   []string{addrs},
17		DialTimeout: timeout})
18	if err != nil {
19		fmt.Printf("init etcd failed, err: %v\n", err)
20		return
21	}
22	return
23}

man.go函数初始化的时候初始化etcd,并将之前的run和初始化taillog的注释掉

1  // 2. 初始化etcd
2	err = etcd.Init(cfg.EtcdConf.Address, time.Duration(cfg.EtcdConf.Timeout)*time.Second)
3	if err != nil {
4		fmt.Printf("init etcd failed, err:%v\n", err)
5		return
6	}
7	fmt.Println("init etcd success.")

从etcd中获取配置信息

配置信息定义如下:

[{"path":"/var/log/nginx/access.log","topic":"web_log"},{"path":"/var/log/message","topic":"system_log"}]

根据key从etcd中获取配置项

etcd/etcd.go新增一个GetConf函数用于获取配置信息

 1// 需要收集的日志的配置信息
 2type LogEntry struct {
 3	Path  string `json:"path"`  // 日志存放的路径
 4	Topic string `json:"topic"` // 日志要发往Kafka中的哪个Topic
 5}
 6
 7func GetConf(key string) (logEntryConf []*LogEntry, err error) {
 8	// get
 9	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
10	resp, err := client.Get(ctx, key)
11	cancel()
12	if err != nil {
13		fmt.Printf("get from etcd failed, err:%v\n", err)
14		return
15	}
16	for _, ev := range resp.Kvs {
17		err = json.Unmarshal(ev.Value, &logEntryConf)
18		if err != nil {
19			fmt.Printf("unmarshal etcd value failed,err:%v\n", err)
20			return
21		}
22	}
23	return
24}

main.go获取配置信息并打印

 1	// 2.1 从etcd中获取日志收集项的配置信息
 2	logEntryConf, err := etcd.GetConf(cfg.EtcdConf.Key)
 3	if err != nil {
 4		fmt.Printf("get conf from etcd failed,err:%v\n", err)
 5		return
 6	}
 7	fmt.Printf("get conf from etcd success, %v\n", logEntryConf)
 8	for index, value := range logEntryConf {
 9		fmt.Printf("index:%v value:%v\n", index, value)
10	}

测试能否正常拿到值

 1# 通过命令行客户端将配置信息写入etcd
 2etcdctl --endpoints=http://localhost:2379 put /logagent/collect '[{"path":"/var/log/nginx/access.log","topic":"web_log"},{"path":"/var/log/messages","topic":"system_log"}]'
 3
 4# 获取配置
 5go build
 6./logAgent
 7init kafka success
 8init etcd success.
 9get conf from etcd success, [0xc0000c6240 0xc0000c62c0]
10index:0 value:&{/var/log/nginx/access.log web_log}
11index:1 value:&{/var/log/messages system_log}

好的,现在我们拿到了配置信息,下一步就是拿着这些配置项进行日志收集。