Pulsar集群部署

简介

Apache Pulsar 是Apache软件基金会顶级项目,是下一代云原生分布式消息流平台,集消息、存储、轻量化函数式计算为一体,采用计算与存储分离架构设计,支持多租户、持久化存储、多机房跨区域数据复制,具有强一致性、高吞吐、低延时及高可扩展性等流数据存储特性,被看作是云原生时代实时消息流传输、存储和计算最佳解决方案,截至 2021 年 3 月全球贡献者超过 380 位。

官方网站:http://pulsar.apache.org

Pulsar 的关键特性如下:

  • Pulsar 的单个实例原生支持多个集群,可跨机房在集群间无缝地完成消息复制。

  • 极低的发布延迟和端到端延迟。

  • 可无缝扩展到超过一百万个 topic。

  • 简单的客户端 API,支持 Java、Go、Python 和 C++。

  • 支持多种 topic 订阅模式(独占订阅、共享订阅、故障转移订阅)。

  • 通过 Apache BookKeeper 提供的持久化消息存储机制保证消息传递 。

    • 由轻量级的 serverless 计算框架 Pulsar Functions 实现流原生的数据处理。
  • 基于 Pulsar Functions 的 serverless connector 框架 Pulsar IO 使得数据更易移入、移出 Apache Pulsar。

  • 分层式存储可在数据陈旧时,将数据从热存储卸载到冷/长期存储(如S3、GCS)中。

架构

Pulsar架构 在图中,客户端必须能够使用单个URL连接到Pulsar集群。在这种情况下,pulsar-cluster.acme.com包含了所有的broker的地址。Pulsar消息brokers与BookKeeper bookies一起运行;反过来,brokers和bookies都依赖ZooKeeper。

Pulsar集群部署

1. 主机规划

主机 操作系统 配置 IP 备注
ps-zk1 CentOS 7.6 4C8G100G 192.168.2.159 zookeeper
ps-zk2 CentOS 7.6 4C8G100G 192.168.2.160 zookeeper
ps-zk3 CentOS 7.6 4C8G100G 192.168.2.161 zookeeper
ps-bk1 CentOS 7.6 8C16G100G 192.168.2.162 bookkeeper, broker
ps-bk2 CentOS 7.6 8C16G100G 192.168.2.163 bookkeeper, broker
ps-bk3 CentOS 7.6 8C16G100G 192.168.2.164 bookkeeper, broker

2.基础配置

需要在所有节点操作

 1# 关闭防火墙
 2systemctl disable --now firewalld
 3# 关闭selinux
 4setenforce 0
 5sed -i 's/SELINUX=enforcing/SELINUX=disabled/g' /etc/selinux/config
 6# 设置时区
 7timedatectl set-timezone Asia/Shanghai
 8# 时间同步
 9yum -y install chrony
10systemctl enable --now chronyd

安装java

需要在所有节点执行

1yum -y install java-1.8.0
2
3# 验证版本
4java -version

看到如下信息,说明安装正常

1openjdk version "1.8.0_282"
2OpenJDK Runtime Environment (build 1.8.0_282-b08)
3OpenJDK 64-Bit Server VM (build 25.282-b08, mixed mode)

下载pulsar包

所有节点执行,或者其中一节点下载,然后scp过去

1yum -y install wget vim
2wget http://mirrors.tencent.com/apache/pulsar/pulsar-2.7.1/apache-pulsar-2.7.1-bin.tar.gz
3# 解压
4tar xf apache-pulsar-2.7.1-bin.tar.gz -C /opt

3.部署ZooKeeper

1. 配置zookeeper

如无特别说明的在所有zookeeper节点执行,有说明的只在指定节点执行 pulsar安装包已经自带了zookeeper,不需要额外下载zookeeper

 1cd /opt/apache-pulsar-2.7.1
 2mkdir -p data/zookeeper
 3# zk1执行
 4echo 1 > data/zookeeper/myid
 5# zk2执行
 6echo 2 > data/zookeeper/myid
 7# zk3执行
 8echo 3 > data/zookeeper/myid
 9
10echo '
11server.1=192.168.2.159:2888:3888
12server.2=192.168.2.160:2888:3888
13server.3=192.168.2.161:2888:3888
14' >> conf/zookeeper.conf

2. 运行zookeeper

1bin/pulsar-daemon start zookeeper

3. 初始化元数据信息

1bin/pulsar initialize-cluster-metadata \
2  --cluster pulsar-cluster-zk \
3  --zookeeper 192.168.2.159:2181,192.168.2.160:2181,192.168.2.161:2181 \
4  --configuration-store 192.168.2.159:2181,192.168.2.160:2181,192.168.2.161:2181 \
5  --web-service-url http://192.168.2.162:8080,192.168.2.163:8080,192.168.2.164:8080 \
6  --web-service-url-tls https://192.168.2.162:8443,192.168.2.163:8443,192.168.2.164:8443 \
7  --broker-service-url pulsar://192.168.2.162:6650,192.168.2.163:6650,192.168.2.164:6650 \
8  --broker-service-url-tls pulsar+ssl://192.168.2.162:6651,192.168.2.163:6651,192.168.2.164:6651

4. 部署BookKeeper

1. 配置bookkeeper

在bookkeeper节点执行

1cd /opt/apache-pulsar-2.7.1
2mkdir -p data/bookkeeper/{journal,ledgers}
3# 修改zookeeper连接信息
4sed -i '/^zkServers/s/localhost:2181/192.168.2.159:2181,192.168.2.160:2181,192.168.2.161:2181/' conf/bookkeeper.conf
5# 修改advertisedAddress地址
6my_ip=$(ip addr show $(ip route | grep default |awk '{print $NF}') | grep inet | grep -v inet6 | awk '{print $2}' | awk -F'/' '{print $1}')
7sed -i "/^advertisedAddress=/s/.*/advertisedAddress=${my_ip}/" conf/bookkeeper.conf

2. 初始化元数据信息

在任意一个bookkeeper节点执行一次就可以了

1bin/bookkeeper shell metaformat

看到Successfully formatted BookKeeper metadata提示信息,说明初始化元数据信息成功。

111:15:50.423 [main] INFO  org.apache.bookkeeper.discover.ZKRegistrationManager - Successfully formatted BookKeeper metadata
211:15:50.536 [main] INFO  org.apache.zookeeper.ZooKeeper - Session: 0x200002a92940001 closed
311:15:50.536 [main-EventThread] INFO  org.apache.zookeeper.ClientCnxn - EventThread shut down for session: 0x200002a92940001

3. 运行bookkeeper

1bin/pulsar-daemon start bookie

4. 验证是否启动成功

1bin/bookkeeper shell bookiesanity

正常情况下会得到以下输出:

1...
211:23:04.427 [main] INFO  org.apache.bookkeeper.tools.cli.commands.bookie.SanityTestCommand - Bookie sanity test succeeded

5. 部署Broker

1. 配置broker

在所有broker节点执行

1sed -i '/^zookeeperServers=/s/.*/zookeeperServers=192.168.2.159:2181,192.168.2.160:2181,192.168.2.161:2181/' conf/broker.conf
2sed -i '/^configurationStoreServers=/s/.*/configurationStoreServers=192.168.2.159:2181,192.168.2.160:2181,192.168.2.161:2181/' conf/broker.conf
3sed -i '/^clusterName=/s/.*/clusterName=pulsar-cluster-zk/' conf/broker.conf
4# 修改advertisedAddress地址
5my_ip=$(ip addr show $(ip route | grep default |awk '{print $NF}') | grep inet | grep -v inet6 | awk '{print $2}' | awk -F'/' '{print $1}')
6sed -i "/^advertisedAddress=/s/.*/advertisedAddress=${my_ip}/" conf/broker.conf

2. 运行broker

1bin/pulsar-daemon start broker

3. 检查broker节点情况

1bin/pulsar-admin brokers list pulsar-cluster

正常情况下会得到一下输出

1"192.168.2.162:8080"
2"192.168.2.163:8080"
3"192.168.2.164:8080"

6. 测试

1. 修改client.conf配置文件

1sed -i '/^webServiceUrl/s/localhost:8080/192.168.2.162:8080,192.168.2.163:8080,192.168.2.164:8080/' conf/client.conf
2sed -i '/^brokerServiceUrl/s/localhost:6650/192.168.2.162:6650,192.168.2.163:6650,192.168.2.164:6650/' conf/client.conf

2. 测试生产消息

1bin/pulsar-client produce \
2  persistent://public/default/test \
3  -n 1 \
4  -m "Hello Pulsar"

正常输出

112:01:33.612 [main] INFO  org.apache.pulsar.client.cli.PulsarClientTool - 1 messages successfully produced

3. 测试消费消息

PS: 需要先启动消费端

1bin/pulsar-client consume \
2  persistent://public/default/test \
3  -n 100 \
4  -s "consumer-test" \
5  -t "Exclusive"

得到结果

1----- got message -----
2key:[null], properties:[], content:Hello Pulsar

测试好生产和消费都没有问题,至此Pulsar集群的安装部署就到这里了。