当前位置:首页 > Windows程序 > 正文

Kafka 高级API 实战

2024-03-31 Windows程序

CDH 5.16.1
kafka版本 2.1.0-kafka-4.0.0

<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.12</artifactId> <version>2.1.0-kafka-4.0.0</version> </dependency> 2.生产者 2.1 生产者,带回调函数 package com.monk.kafka; import com.monk.utils.PropertiesUtil; import org.apache.kafka.clients.producer.*; import java.util.Properties; /** * @author wu ning * @className: KafkaProducer * @description: TODO * @date 2019/12/15 15:21 */ public class NewKafkaProducer extends Thread { private String topic; private String bootstrapServer = PropertiesUtil.getProperties("bootstrap.servers"); private KafkaProducer<Integer, String> producer; public NewKafkaProducer(String topic) { this.topic = topic; Properties prop = new Properties(); prop.put("bootstrap.servers", bootstrapServer); prop.put("acks", "1"); // key序列化 prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // value序列化 prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producer = new KafkaProducer<>(prop); } @Override public void run() { int i = 1; while (true) { ProducerRecord producerRecord = new ProducerRecord<Integer, String>( PropertiesUtil.getProperties("kafka.topic"), "new_kafkaproducer ==> " + i); producer.send(producerRecord, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if(exception != null){ System.out.println("发送失败"); }else { System.out.println("offset:" + metadata.offset()); System.out.println("partition:" + metadata.partition()); } } }); i++; } } } 2.2 生产者,,自定义分区 1.实现Partitioner接口,重写里面的方法 package com.monk.kafka; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import java.util.Map; /** * @author wu ning * @className: CustomPartitioner * @description: TODO * @date 2019/12/15 18:31 */ public class CustomPartitioner implements Partitioner { @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { return 0; } @Override public void close() { } @Override public void configure(Map<String, ?> configs) { } } 2.在代码中指定自定义分区类 //指定自定义分区 prop.put("partitioner.class", "com.monk.kafka.CustomPartitioner"); package com.monk.kafka; import com.monk.utils.PropertiesUtil; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; /** * @author wu ning * @className: NewKafkaProducerWithPartitioner * @description: TODO * @date 2019/12/15 18:33 */ public class NewKafkaProducerWithPartitioner extends Thread { private String topic; private String bootstrapServer = PropertiesUtil.getProperties("bootstrap.servers"); private KafkaProducer<Integer, String> producer; public NewKafkaProducerWithPartitioner(String topic) { this.topic = topic; Properties prop = new Properties(); prop.put("bootstrap.servers", bootstrapServer); prop.put("acks", "1"); // key序列化 prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // value序列化 prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //指定自定义分区 prop.put("partitioner.class", "com.monk.kafka.CustomPartitioner"); producer = new KafkaProducer<>(prop); } @Override public void run() { for(int i = 1;i<= 10;i++){ ProducerRecord<Integer, String> producerRecord = new ProducerRecord<>(topic, "NewKafkaProducerWithPartitioner ==> " + i); producer.send(producerRecord,(metadata,exception) ->{ if(exception != null){ System.out.println("发送失败"); }else { System.out.println("offset:" + metadata.offset()); System.out.println("partition:" + metadata.partition()); } }); } producer.close(); } } 3. 消费者 3.1 消费者,简单模式 package com.monk.kafka; import com.monk.utils.PropertiesUtil; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Collections; import java.util.Properties; /** * @author wu ning * @className: NewKafkaConsumer * @description: TODO * @date 2019/12/15 18:48 */ public class NewKafkaConsumer extends Thread { private String topic; private String bootstrapServer = PropertiesUtil.getProperties("bootstrap.servers"); private KafkaConsumer<Integer,String> kafkaConsumer; public NewKafkaConsumer(String topic){ this.topic = topic; Properties props = new Properties(); props.put("bootstrap.servers", bootstrapServer); //消费者组ID props.put("group.id", "test1"); //设置自动提交offset props.put("enable.auto.commit", "true"); //设置自动提交offset的延时(可能会造成重复消费的情况) props.put("auto.commit.interval.ms", "1000"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); //key-value的反序列化 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); kafkaConsumer = new KafkaConsumer<Integer, String>(props); } @Override public void run() { kafkaConsumer.subscribe(Collections.singletonList(topic)); while(true){ //间隔100毫秒,从topic拉取消息 ConsumerRecords<Integer, String> records = kafkaConsumer.poll(100); for(ConsumerRecord record:records){ System.out.println("==> " + record.value()); } } } } 3.2 消费者,消费指定partition,定位offset //消费指定分区 kafkaConsumer.assign(Collections.singleton(topicPartition)); //指定偏移量 kafkaConsumer.seek(topicPartition,3985); package com.monk.kafka; import com.monk.utils.PropertiesUtil; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import java.util.Collections; import java.util.Properties; /** * @author wu ning * @className: NewKafkaConsumerWithPartition * @description: TODO * @date 2019/12/15 19:14 */ public class NewKafkaConsumerWithPartition extends Thread{ private String topic; private String bootstrapServer = PropertiesUtil.getProperties("bootstrap.servers"); private KafkaConsumer<Integer,String> kafkaConsumer; public NewKafkaConsumerWithPartition(String topic){ this.topic = topic; Properties props = new Properties(); props.put("bootstrap.servers", bootstrapServer); //消费者组ID props.put("group.id", "test11"); //设置自动提交offset props.put("enable.auto.commit", "true"); //设置自动提交offset的延时(可能会造成重复消费的情况) props.put("auto.commit.interval.ms", "1000"); //key-value的反序列化 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); kafkaConsumer = new KafkaConsumer<Integer, String>(props); } @Override public void run() { TopicPartition topicPartition = new TopicPartition(topic, 0); //消费指定分区 kafkaConsumer.assign(Collections.singleton(topicPartition)); //指定偏移量 kafkaConsumer.seek(topicPartition,3985); while(true){ //间隔100毫秒,从topic拉取消息 ConsumerRecords<Integer, String> records = kafkaConsumer.poll(100); for(ConsumerRecord record:records){ System.out.println("==> " + record.value()); System.out.println("partition ==> " + record.partition()); System.out.println("offset ==> " + record.offset()); } } } }

Kafka 高级API 实战

标签:

原文地址:https://www.cnblogs.com/wuning/p/12044499.html

温馨提示: 本文由Jm博客推荐,转载请保留链接: https://www.jmwww.net/file/45068.html

Jm-杰米博客Jamie
草根站长的技术交流乐园!IT不会不要紧快来好好学习吧!
  • 20786文章总数
  • 7494595访问次数
  • 建站天数
  • 友情链接