Kafka部署和使用-zookeeper集群

Kafka简介

Kafka 被称为下一代分布式消息系统,是非营利性组织ASF(Apache Software Foundation,简称为ASF)基金会中的一个开源项目,比如HTTP Server、Hadoop、ActiveMQ、Tomcat等开源软件都属于Apache基金会的开源软件,类似的消息系统还有RbbitMQ、ActiveMQ、ZeroMQ,最主要的优势是其具备分布式功能、并且结合zookeeper可以实现动态扩容。

介绍网址:https://www.infoq.cn/article/apache-kafka/

安装环境:Centos7.6系统

服务器三台:zookeeper集群特性:服务器个数为奇数(3、5、7),可用服务器个数必须>(n/2),如果服务器数量为4的话,坏一台可以,两台集群就无法使用。
IP地址分别为:172.20.102.41/16、172.20.102.48/16、172.20.102.50/16

下载安装并验证zookeeper:

kafka下载地址:

http://kafka.apache.org/downloads.html

zookeeper 下载地址:

http://zookeeper.apache.org/releases.html

安装zookeeper:

ZooKeeper是一个分布式且开源的分布式应用程序协调服务。

下载后的安装文件上传到各服务器的/usr/local/src目录然后分别执行以下操作。

注意:Java环境提前部署,本实验使用jdk-8-191

zookeeper-1配置:

[root@zookeeper-1 ~]# cd /usr/local/src

[root@zookeeper-1 src]# tar xvf zookeeper-3.4.14.tar.gz

[root@zookeeper-1 src]#  ln -sv /usr/local/src/zookeeper-3.4.14 /usr/local/zookeeper

[root@zookeeper-1 src]# mkdir  /usr/local/zookeeper/data

[root@zookeeper-1 src]# cp /usr/local/zookeeper/conf/zoo_sample.cfg   /usr/local/zookeeper/conf/zoo.cfg

[root@zookeeper-1 src]# vim /usr/local/zookeeper/conf/zoo.cfg

[root@linux-host1 src]# grep "^[a-Z]" /usr/local/zookeeper/conf/zoo.cfg 
tickTime=2000   #服务器与服务器之间和客户端与服务器之间的单次心跳检测时间间隔,单位为毫秒
initLimit=5     #集群中leader服务器与follower服务器初始连接心跳次数,即多少个2000毫秒
syncLimit=5     #leader与follower之间连接完成之后,后期检测发送和应答的心跳次数,如果该follower在设置的时间内(5*2000)不能与leader 进行通信,那么此 follower 将被视为不可用。
clientPort=2181 #客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求
dataDir=/usr/local/zookeeper/data   #自定义的zookeeper保存数据的目录
autopurge.snapRetainCount=3         #设置zookeeper保存保留多少次客户端连接的数据
autopurge.purgeInterval=1           #设置zookeeper间隔多少小时清理一次保存的客户端数据
server.1=172.20.102.41:2888:3888    #服务器编号=服务器IP:LF数据同步端口:LF选举端口
server.2=172.20.102.48:2888:3888
server.3=172.20.102.50:2888:3888

[root@zookeeper-1 src]# echo "1" > /usr/local/zookeeper/data/myid

zookeeper-2配置:

其余配置和zookeeper-1一致:[root@zookeeper-2 src]# echo "2" > /usr/local/zookeeper/data/myid

zookeeper-3配置:

其余配置和zookeeper-1一致:[root@zookeeper-3 src]# echo "3" > /usr/local/zookeeper/data/myid

各服务器启动zookeeper:

注意:各服务器启动时间不要间隔过长,最好一同启动。
[root@zookeeper-1 ~]# /usr/local/zookeeper/bin/zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

[root@zookeeper-2 src]# /usr/local/zookeeper/bin/zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

[root@zookeeper-3 src]# /usr/local/zookeeper/bin/zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

查看各zookeeper状态:

[root@zookeeper-1 ~]# /usr/local/zookeeper/bin/zkServer.sh  status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Mode: follower

[root@zookeeper-2 src]# /usr/local/zookeeper/bin/zkServer.sh  status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Mode: follower

[root@zookeeper-3 src]# /usr/local/zookeeper/bin/zkServer.sh  status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Mode: leader

zookeeper简单操作命令:

[root@zookeeper-1 ~]# /usr/local/zookeeper/bin/zkCli.sh -server 172.20.102.48:2181
WatchedEvent state:SyncConnected type:None path:null
create /zookeeper-test "hello"
Created /zookeeper-test
[zk: 172.20.102.48:2181(CONNECTED) 1]

#其他节点验证:
[root@zookeeper-1 ~]# /usr/local/zookeeper/bin/zkCli.sh -server 172.20.102.50:2181
WatchedEvent state:SyncConnected type:None path:null
get /zookeeper-test
hello
cZxid = 0x100000002
ctime = Sat Jul 27 20:49:34 CST 2019
mZxid = 0x100000002
mtime = Sat Jul 27 20:49:34 CST 2019
pZxid = 0x100000002
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 5
numChildren = 0
[zk: 172.20.102.50:2181(CONNECTED) 1] 

安装并测试kafka:

Broker:
  • Kafka集群包含一个或多个服务器,这种服务器被称为broker
Topic:
  • 每条发布到Kafka集群的消息都有一个类别,这个类别被称为topic。

  • (物理上不同topic的消息分开存储,逻辑上一个topic的消息虽然保存于一个或多个broker上但用户只需指定消息的topic即可生产或消费数据而不必关心数据存于何处)

Partition:
  • parition是物理上的概念,每个topic包含一个或多个partition

  • 创建topic时可指定parition数量。

每个partition对应于一个文件夹,该文件夹下存储该partition的数据和索引文件

Producer
  • 负责发布消息到Kafka broker
Consumer
  • 消费消息,每个consumer属于一个特定的consuer group

  • (可为每个consumer指定group name,若不指定group name则属于默认的group)。

  • 使用consumer high level API时,同一topic的一条消息只能被同一个consumer group内的一个consumer消费,

  • 但多个consumer group可同时消费这一消息。

zookeeper-1安装kafka:

[root@zookeeper-1 src]#  tar xvf kafka_2.12-2.1.0.tgz

[root@zookeeper-1 src]# ln -sv /usr/local/src/kafka_2.12-2.1.0 /usr/local/kafka

[root@zookeeper-1 src]# vim /usr/local/kafka/config/server.properties
21 broker.id=1                                  #设置每个代理全局唯一的整数ID
31 listeners=PLAINTEXT://172.20.102.41:9092     #监听地址
103 log.retention.hours=168                     #保留指定小时的日志内容。168为一周
123 zookeeper.connect=172.20.102.41:2181,172.20.102.48:2181,172.20.102.50:2181  #所有的zookeeper地址

zookeeper-2安装kafka:

[root@zookeeper-2 src]# vim /usr/local/kafka/config/server.properties
21 broker.id=2
31 listeners=PLAINTEXT://172.20.102.48:9092
103 log.retention.hours=168
123 zookeeper.connect=172.20.102.41:2181,172.20.102.48:2181,172.20.102.50:2181

zookeeper-3安装kafka:

[root@zookeeper-3 src]# vim /usr/local/kafka/config/server.properties
21 broker.id=3
31 listeners=PLAINTEXT://172.20.102.48:9092
103 log.retention.hours=168
123 zookeeper.connect=172.20.102.41:2181,172.20.102.48:2181,172.20.102.50:2181

分别启动kafka:

zookeeper-1启动kafka:

[root@zookeeper-1 src]# /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties    #以守护进程的方式启动

验证kafka启动状态:tail -f /usr/local/kafka/logs/server.log

zookeeper-2启动kafka:

[root@zookeeper-2 src]# /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties

zookeeper-3启动kafka:

[root@zookeeper-3 src]# /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties

# 此方式zookeeper会在shell断开后关闭
# /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &

测试kafka:

验证进程:
[root@zookeeper-1 src]# jps
24388 Jps
21719 QuorumPeerMain
23997 Kafka

[root@zookeeper-2 src]# jps
21088 QuorumPeerMain
23312 Kafka
23744 Jps

[root@zookeeper-3 src]# jps
21121 QuorumPeerMain
21186 -- process information unavailable
23351 Kafka
23787 Jps

测试创建topic:

创建名为logstashtest,partitions(分区)为3,replication(复制)为3的topic(主题):
在任意kafaka服务器操作:
[root@zookeeper-1 src]# /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 172.20.102.41:2181,172.20.102.48:2181,172.20.102.50:2181 --partitions 3 --replication-factor 3 --topic logstashtest
Created topic "logstashtest".

测试获取topic:

可以在任意一台kafka服务器进行测试:可以在任意一台kafka服务器进行测试:
[root@zookeeper-2 src]# /usr/local/kafka/bin/kafka-topics.sh  --describe --zookeeper 172.20.102.41:2181,172.20.102.48:2181,172.20.102.50:2181 --topic logstashtest

状态说明:logstashtest有三个分区分别为0、1、2,分区0的leader是3(broker.id),分区0有三个副本,并且状态都为lsr(ln-sync,表示可以参加选举成为leader)。

删除topic:

[root@zookeeper-2 src]# /usr/local/kafka/bin/kafka-topics.sh  --delete --zookeeper 172.20.102.41:2181,172.20.102.48:2181,172.20.102.50:2181 --topic logstashtest
Topic logstashtest is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

# 再次验证topic,无返回值,已被删除
[root@zookeeper-2 src]# /usr/local/kafka/bin/kafka-topics.sh  --describe --zookeeper 172.20.102.41:2181,172.20.102.48:2181,172.20.102.50:2181 --topic logstashtest

获取所有topic:

[root@zookeeper-1 src]# /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 172.20.102.41:2181,172.20.102.48:2181,172.20.102.50:2181

kafka命令测试消息发送:

创建topic:
[root@zookeeper-1 src]# /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 172.20.102.41:2181,172.20.102.48:2181,172.20.102.50:2181 --partitions 3 --replication-factor 3 --topic messagetest
Created topic "messagetest".
发送消息:
[root@zookeeper-1 src]# /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 172.20.102.41:9092,172.20.102.48:9092,172.20.102.50:9092 --topic messagetest
Created topic "messagetest".
其他kafka服务器测试获取数据:

zookeeper-1获取

[root@zookeeper-1 src]# /usr/local/kafka/bin/kafka-console-consumer.sh  --topic messagetest --bootstrap-server 172.20.102.41:9092 --from-beginning

zookeeper-2获取

[root@zookeeper-2 src]# /usr/local/kafka/bin/kafka-console-consumer.sh  --topic messagetest --bootstrap-server 172.20.102.48:9092 --from-beginning

zookeeper-3获取

[root@zookeeper-3 src]# /usr/local/kafka/bin/kafka-console-consumer.sh  --topic messagetest --bootstrap-server 172.20.102.50:9092 --from-beginning

使用logstash测试向kafka写入数据:

编辑logstash配置文件:
# vim /etc/logstash/conf.d/logstash-to-kafka.sh
input {
    stdin {}
}

output {
    kafka {
        topic_id => "hello"
        bootstrap_servers => "172.20.102.41:9092"
        batch_size => "5"
    }
    stdout {
        codec => "rubydebug"
    }
}
临时启动logstash向kafka写入数据:
root@logstash ~]#/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/logstash-to-kafka.conf
welcome to kafka
{
          "host" => "logstash.martin.com",
      "@version" => "1",
       "message" => "welcome to kafka",
    "@timestamp" => 2019-07-28T05:45:58.567Z
}
this is kafka cluster
{
          "host" => "logstash.martin.com",
      "@version" => "1",
       "message" => "this is kafka cluster",
    "@timestamp" => 2019-07-28T05:46:29.161Z
}

验证kafka收到logstash数据:

[root@zookeeper-1 ~]#  /usr/local/kafka/bin/kafka-console-consumer.sh  --topic hello --bootstrap-server 172.20.102.41:9092 --from-beginning
2019-07-28T05:45:58.567Z logstash.martin.com welcome to kafka
2019-07-28T05:46:29.161Z logstash.martin.com this is kafka cluster

使用kafka tool连接服务器进行数据查看:

Logstash安装jdbc插件,并作离线导入到无外网主机

安装gem
# yum install gem

替换网站源
# gem sources --remove https://rubygems.org/ &&  gem sources --add https://gems.ruby-china.com/

查看只剩一个网站
# gem sources -l
*** CURRENT SOURCES ***

https://gems.ruby-china.com/

安装插件
[root@martinhe logstash]# bash bin/logstash-plugin install logstash-input-jdbc

备份插件
# cp -a /usr/local/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-output-jdbc-5.4.0/* ./logstash-output-jdbc-5.4.0
# cp /usr/local/logstash/vendor/bundle/jruby/2.3.0/specifications/logstash-output-jdbc-5.4.0.gemspec  ./
# tar czvf logstash-output-jdbc-5.4.0.tar.gz ./*

拷贝到内网主机,将文件加和文件存储到其对应的目录下
/usr/local/logstash
修改logstash下的Gemfile,在最后添加一行
gem "logstash-output-jdbc"

查看插件是否已经存在
# ./bin/logstash-plugin list