Go运维开发之日志收集(2)从etcd中获取配置信息
Go运维开发之日志收集(2)从etcd中获取配置信息
在上一篇中我们已经实现了单个日志收集后发送到kafka中
我们想一下下面的几个问题:
- 怎么样实现多个日志收集?
- 每收集一个日志都运行一个logAgent?
- 每次收集日志都需要手动去服务器配置?
- 有没有办法从配置服务器获取?
带着这些疑问,这里我们继续开发我们的日志收集服务,在这一节中,我们引入etcd服务,通过etcd进行配置的增删改实现日志的收集。
- 实现日志收集项从etcd获取
环境准备
这一节我们需要用到etcd服务器,可以从官网的Github下载http://github.com/etcd-io/etcd/releases
下载之后直接二进制运行
1etcd --logger=zap
默认监听在2379(client)和2380(集群通信)端口上
或者还是使用docker-compose.yaml
文件
1version: "3"
2
3networks:
4 app-kafka:
5 driver: bridge
6
7services:
8 zookeeper:
9 container_name: zookeeper
10 image: zookeeper:3.4.14
11 restart: always
12 networks:
13 - app-kafka
14 kafka:
15 container_name: kafka
16 image: bitnami/kafka:2.4.0
17 restart: always
18 # 后面三条是暴露给外网使用
19 environment:
20 - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
21 - ALLOW_PLAINTEXT_LISTENER=yes
22 - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
23 - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,PLAINTEXT_HOST://:29092
24 - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
25 ports:
26 - 127.0.0.1:9092:9092
27 - 127.0.0.1:29092:29092
28 networks:
29 - app-kafka
30 etcd:
31 container_name: etcd
32 image: bitnami/etcd:3
33 restart: always
34 environment:
35 - ALLOW_NONE_AUTHENTICATION=yes
36 ports:
37 - 127.0.0.1:2379:2379
38 networks:
39 - app-kafka
改完之后直接运行
1docker-compose up -d etcd # 运行单个服务,如果之前的没有停的话
2docker-compose up -d # 运行yaml文件里面所有的服务
在开始之前我们先熟悉一下etcd的基础操作。
etcd基础操作
Put/Get操作
这里我们使用官方的库go.etcd.io/etcd/clientv3
新建一个目录etcd_demo
,新建etc.go
文件
1package main
2
3import (
4 "context"
5 "fmt"
6 "time"
7
8 "go.etcd.io/etcd/clientv3"
9)
10
11func main() {
12 cli, err := clientv3.New(clientv3.Config{
13 Endpoints: []string{"127.0.0.1:2379"},
14 DialTimeout: 5 * time.Second})
15 if err != nil {
16 fmt.Printf("init etcd failed, err: %v\n", err)
17 return
18 }
19 fmt.Println("init etcd success.")
20
21 defer cli.Close()
22 // put操作
23 ctx, cancel := context.WithTimeout(context.Background(), time.Second)
24 _, err = cli.Put(ctx, "/name", "jerry")
25 cancel()
26 if err != nil {
27 fmt.Printf("put to etcd failed, err:%v\n", err)
28 return
29 }
30 fmt.Println("put to etcd success.")
31
32 // get操作
33 ctx, cancel = context.WithTimeout(context.Background(), time.Second)
34 resp, err := cli.Get(ctx, "/name")
35 cancel()
36
37 if err != nil {
38 fmt.Println("get from etcd failed, err:", err)
39 return
40 }
41 for _, v := range resp.Kvs {
42 fmt.Printf("Key:%s,Value:%s\n", v.Key, v.Value)
43 }
然后我们尝试构建一下
1go mod init
2go build
过程中出现错误,原因是grpc版本过高导致的,解决办法etcd undefined: resolver.BuildOption
正常运行之后,我们看到下面的结果
1init etcd success.
2put to etcd success.
3Key:name,Value:jerry
Watch操作
创建一个etcd_watch的目录,在该目录下创建etcd_watch.go
文件
1package main
2
3import (
4 "context"
5 "fmt"
6 "time"
7
8 "go.etcd.io/etcd/clientv3"
9)
10
11func main() {
12 cli, err := clientv3.New(clientv3.Config{
13 Endpoints: []string{"127.0.0.1:2379"},
14 DialTimeout: time.Second,
15 })
16 if err != nil {
17 fmt.Printf("connect to etcd failed, err: %v\n", err)
18 return
19 }
20 fmt.Println("connect etcd success.")
21
22 defer cli.Close()
23
24 // Watch操作
25 wch := cli.Watch(context.Background(), "/name")
26 for resp := range wch {
27 for _, ev := range resp.Events {
28 fmt.Printf("Type: %v, Key:%v, Value:%v\n", ev.Type, string(ev.Kv.Key), string(ev.Kv.Value))
29 }
30 }
31}
构建运行,然后尝试通过etcdctl
向etcd指定的key/name
发送数据测试
1etcdctl --endpoints=http://localhost:2379 put /name "abc"
2etcdctl --endpoints=http://localhost:2379 put /name "jerry"
etcd_watch的运行结果
1./etcd_watch
2connect etcd success.
3Type: PUT, Key:/name, Value:abc
4Type: PUT, Key:/name, Value:jerry
客户端操作
1# 新增/修改
2etcdctl --endpoints=http://localhost:2379 put /name "abc"
3# 获取
4etcdctl --endpoints=http://localhost:2379 get /name
5# 删除
6etcdctl --endpoints=http://localhost:2379 del /name
logAgent_v0.1.1(引入etcd)
修改配置文件,将之前的topic和日志文件的路径去掉,添加etcd服务器地址,准备从etcd获取topic和日志文件的路径
conf/config.ini
1[kafka]
2address = 127.0.0.1:29092
3
4[etcd]
5address = 127.0.0.1:2379
6timeout = 5
7collect_log_key = /logagent/collect # 收集的日志的key
修改对应的结构体conf/config.go
1package conf
2
3type AppConf struct {
4 KafkaConf `ini:"kafka"`
5 EtcdConf `ini:"etcd"`
6}
7
8type KafkaConf struct {
9 Address string `ini:"address"`
10}
11
12type EtcdConf struct {
13 Address string `ini:"address"`
14 Timeout int `ini:"timeout"`
15 Key string `ini:"collect_log_key"`
16}
在logAgent项目中创建etcd目录,并创建etcd.go文件
etcd/etcd.go
内容如下:
1package etcd
2
3import (
4 "fmt"
5 "time"
6
7 "go.etcd.io/etcd/clientv3"
8)
9
10var (
11 client *clientv3.Client
12)
13
14func Init(addrs string, timeout time.Duration) (err error) {
15 client, err = clientv3.New(clientv3.Config{
16 Endpoints: []string{addrs},
17 DialTimeout: timeout})
18 if err != nil {
19 fmt.Printf("init etcd failed, err: %v\n", err)
20 return
21 }
22 return
23}
man.go函数初始化的时候初始化etcd
,并将之前的run和初始化taillog的注释掉
1 // 2. 初始化etcd
2 err = etcd.Init(cfg.EtcdConf.Address, time.Duration(cfg.EtcdConf.Timeout)*time.Second)
3 if err != nil {
4 fmt.Printf("init etcd failed, err:%v\n", err)
5 return
6 }
7 fmt.Println("init etcd success.")
从etcd中获取配置信息
配置信息定义如下:
[{"path":"/var/log/nginx/access.log","topic":"web_log"},{"path":"/var/log/message","topic":"system_log"}]
根据key从etcd中获取配置项
etcd/etcd.go新增一个
GetConf函数用于获取配置信息
1// 需要收集的日志的配置信息
2type LogEntry struct {
3 Path string `json:"path"` // 日志存放的路径
4 Topic string `json:"topic"` // 日志要发往Kafka中的哪个Topic
5}
6
7func GetConf(key string) (logEntryConf []*LogEntry, err error) {
8 // get
9 ctx, cancel := context.WithTimeout(context.Background(), time.Second)
10 resp, err := client.Get(ctx, key)
11 cancel()
12 if err != nil {
13 fmt.Printf("get from etcd failed, err:%v\n", err)
14 return
15 }
16 for _, ev := range resp.Kvs {
17 err = json.Unmarshal(ev.Value, &logEntryConf)
18 if err != nil {
19 fmt.Printf("unmarshal etcd value failed,err:%v\n", err)
20 return
21 }
22 }
23 return
24}
main.go
获取配置信息并打印
1 // 2.1 从etcd中获取日志收集项的配置信息
2 logEntryConf, err := etcd.GetConf(cfg.EtcdConf.Key)
3 if err != nil {
4 fmt.Printf("get conf from etcd failed,err:%v\n", err)
5 return
6 }
7 fmt.Printf("get conf from etcd success, %v\n", logEntryConf)
8 for index, value := range logEntryConf {
9 fmt.Printf("index:%v value:%v\n", index, value)
10 }
测试能否正常拿到值
1# 通过命令行客户端将配置信息写入etcd
2etcdctl --endpoints=http://localhost:2379 put /logagent/collect '[{"path":"/var/log/nginx/access.log","topic":"web_log"},{"path":"/var/log/messages","topic":"system_log"}]'
3
4# 获取配置
5go build
6./logAgent
7init kafka success
8init etcd success.
9get conf from etcd success, [0xc0000c6240 0xc0000c62c0]
10index:0 value:&{/var/log/nginx/access.log web_log}
11index:1 value:&{/var/log/messages system_log}
好的,现在我们拿到了配置信息,下一步就是拿着这些配置项进行日志收集。
- 原文作者:黄忠德
- 原文链接:https://huangzhongde.cn/post/2020-03-04-golang_devops_logAgent_2_get_config_from_etcd/
- 版权声明:本作品采用知识共享署名-非商业性使用-禁止演绎 4.0 国际许可协议进行许可,非商业转载请注明出处(作者,原文链接),商业转载请联系作者获得授权。