Kafka安装部署与代码

Kafka安装部署

前提:jdk  zookeeper安装部署,并能正常启动。

  • 下载软件包并解压
tar zxvf kafka_2.11-1.0.0.tgz  -C ../servers/
  • 修改kafka配置文件
/export/servers/kafka_2.11-1.0.0/conf/server.properties

Broker.id=0 (每个节点不能相同)

log.dirs=/export/servers/kafka_2.11-1.0.0/logs/

zookeeper.connect=node01:2181,node02:2181,node03:2181

delete.topic.enable=true

host.name=node01

  • 多节点复制(复制后修改配置)

scp -r kafka_2.11-1.0.0 node02:/$PWD

scp -r kafka_2.11-1.0.0 node03:/$PWD

  • 多节点启动kafka

启动zookeeper

zkstart.sh(自己编写的脚本)

启动kafka(到每个节点启动)


node01: nohup ./bin/kafka-server-start.sh  config/server.properties &

node02: nohup ./bin/kafka-server-start.sh  config/server.properties &

node03s: nohup ./bin/kafka-server-start.sh  config/server.properties &

 

kafka集群的操作

创建topic

bin/kafka-topics.sh --create --zookeeper node01:2181,node02:2181,node03:2181  --replication-factor 2 --partitions 3 --topic 18BD34

查询topic

bin/kafka-topics.sh  --list --zookeeper node01:2181,node02:2181,node03:2181

模拟成产者,生产数据

bin/kafka-console-producer.sh  --broker-list node01:9092,node02:9092,node03:9092  --topic  18BD34

--broker-list 表示存储数据的服务器

 

模拟消费者,消费数据

bin/kafka-console-consumer.sh --zookeeper node01:2181,node02:2181,node03:2181 --topic 18BD34 --from-beginning

--from-beginning  表示从头开始消费

--zookeeper  作用是记录数据消费到的位置(数据消费到了哪里/第几条)

 

 

 

 

 

StreamAPI


import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;

import java.util.Properties;

public class StreamAPI{

    public static void main(String[] args) {
//        Properties props = new Properties();
//        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
//        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092");
//        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
//        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
//        KStreamBuilder builder = new KStreamBuilder();
//        builder.stream("test").mapValues(line -> line.toString().toUpperCase()).to("test2");
//        KafkaStreams streams = new KafkaStreams(builder, props);
//        streams.start();

        Properties props = new Properties();
        //设置程序的唯一标识
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
        //设置kafka集群
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092,node02:9092,node03:9092");
        //设置序列化与反序列化
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        //实例一个计算逻辑
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        //设置计算逻辑   stream:读取                                                         to:写入
        streamsBuilder.stream("demo01").mapValues(line->line.toString().toUpperCase()).to("demo02");
        //构建Topology对象(拓扑,流程)
        final Topology topology = streamsBuilder.build();
        //实例 kafka流
        KafkaStreams streams = new KafkaStreams(topology, props);
        //启动流计算
        streams.start();
    }
}

 

Producer

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class Producer {
    public static void main(String[] args) {
        //kafka集群配置
        Properties props = new Properties();
        props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
        //消息确认机制
        props.put("acks", "all");
        //重试机制
        props.put("retries", 2);
        //批量发送的大小
        props.put("batch.size", 16384);
        //消息延迟
        props.put("linger.ms", 1);
        //批量的缓冲区大小
        props.put("buffer.memory", 33554432);
        //kafka数据中key  value的序列化
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<Object, Object> kafkaProducer = new KafkaProducer<>(props);
        for (int i = 0; i <= 99; i++) {
            ProducerRecord producerRecord = new ProducerRecord<>("18BD-10", 2, "rua", "test"+i);
            kafkaProducer.send(producerRecord);
        }
        kafkaProducer.close();
    }
}

 

Consumer


import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

import java.util.Arrays;
import java.util.Properties;

public class Consumersss {
    public static void main(String[] args) {
        //添加配置
        Properties props = new Properties();
        props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
        //消费组
        props.put("group.id", "test");
        //以下代码 消费者手动提交offset值
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "100");
        //设置key value序列化
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        props.put("auto.offset.reset", "earliest");

        KafkaConsumer kafkaConsumer = new KafkaConsumer<>(props);
        //指定分区 topic
        TopicPartition topicPartition0 = new TopicPartition("18BD-50", 0);
        TopicPartition topicPartition1 = new TopicPartition("18BD-50", 2);
        kafkaConsumer.assign(Arrays.asList(topicPartition0, topicPartition1));
        //指定offerset消费
        //kafkaConsumer.seek(topicPartition0, 0);
        kafkaConsumer.seek(topicPartition1, 10);

        while (true) {
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(100);
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println("分区:" + consumerRecord.partition());
                System.out.println("偏移量:" + consumerRecord.offset());
                System.out.println("key:" + consumerRecord.key());
                System.out.println("value:" + consumerRecord.value());
                System.out.println("-------------------------------");
            }
        //手动提交offset
        kafkaConsumer.commitSync();
        }

    }
}

auto.offset.reset

//earliest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费

//latest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据

//none : topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

 

 

kafka文档参考资料

Kafka集群操作

1、创建topic

cd /export/servers/kafka_2.11-1.0.0

bin/kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 2 --partitions 3 --topic test

2、查看主题命令

cd /export/servers/kafka_2.11-1.0.0

bin/kafka-topics.sh --list --zookeeper node01:2181,node02:2181,node03:2181

3、生产者生产数据

cd /export/servers/kafka_2.11-1.0.0

bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic test

4、消费者消费数据

cd /export/servers/kafka_2.11-1.0.0

bin/ kafka-console-consumer.sh --from-beginning --topic test --zookeeper node01:2181,node02:2181,node03:2181

5、运行describe topics命令

cd /export/servers/kafka_2.11-1.0.0

bin/kafka-topics.sh --describe --zookeeper node01:2181 --topic test

6、增加topic分区数

cd /export/servers/kafka_2.11-1.0.0

bin/kafka-topics.sh --zookeeper zkhost:port --alter --topic topicName --partitions 8

7、增加配置

cd /export/servers/kafka_2.11-1.0.0

bin/kafka-topics.sh --zookeeper node01:2181 --alter --topic test --config flush.messages=1

8、删除配置

cd /export/servers/kafka_2.11-1.0.0

bin/kafka-topics.sh --zookeeper node01:2181 --alter --topic test --delete-config flush.messages

9、删除topic

kafka-topics.sh --zookeeper zkhost:port --delete --topic topicName

Kafak生产者参数配置参考:

Properties props = new Properties();

//kafka服务器地址

props.put("bootstrap.servers", "node01:9092");

//消息确认机制

props.put("acks", "all");

//重试机制

props.put("retries", 0);

//批量发送的大小

props.put("batch.size", 16384);

//消息延迟

props.put("linger.ms", 1);

//批量的缓冲区大小

props.put("buffer.memory", 33554432);

props.put("key.serializer",

"org.apache.kafka.common.serialization.StringSerializer");

props.put("value.serializer",

"org.apache.kafka.common.serialization.StringSerializer");

Kafak消费者参数配置参考:

Properties props = new Properties();

//指定kafka服务器

props.put("bootstrap.servers", "hadoop-01:9092");

//消费组

props.put("group.id", "test");

//以下两行代码 ---消费者自动提交offset值

props.put("enable.auto.commit", "true");

//自动提交的周期

props.put("auto.commit.interval.ms", "1000");

props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

auto.offset.reset

//earliest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费

//latest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据

//none : topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

点赞

发表评论

电子邮件地址不会被公开。必填项已用 * 标注