Kafka简单示例
Kafka简单示例
Kafka
安装
首先需要安装Java
sudo apt install openjdk-11-jdk
下载安装kafka
wget https://dlcdn.apache.org/kafka/3.4.0/kafka_2.13-3.4.0.tgz
tar -xzf kafka_2.13-3.4.0.tgz
cd kafka_2.13-3.4.0
启动zookeeper和kafka:
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
go连接Kafka使用
生产者
使用给定代理地址和配置创建一个同步生产者
// 使用给定代理地址和配置创建一个同步生产者
SyncProducer, err := sarama.NewSyncProducer(
[]string{conn},
config,
)
config可以自由配置:
config := sarama.NewConfig()
// 等待服务器所有副本都保存成功后的响应
config.Producer.RequiredAcks = sarama.WaitForAll
// 随机的分区类型:返回一个分区器,该分区器每次选择一个随机分区
config.Producer.Partitioner = sarama.NewRandomPartitioner
// 是否等待成功和失败后的响应
config.Producer.Return.Successes = true
构建发送的消息:
// 构建发送的消息
msg := &sarama.ProducerMessage{
Topic: topic,
Key: sarama.StringEncoder(time.Now().String()),
Value: sarama.StringEncoder(content),
}
生产者发送消息:
// SendMessage:该方法是生产者生产给定的消息
// 生产成功的时候返回该消息的分区和所在的偏移量
// 生产失败的时候返回error
partition, offset, err := SyncProducer.SendMessage(msg)
消费者
创建一个消费者的实例
config := sarama.NewConfig()
consumer, err := sarama.NewConsumer(c.Node, config)
查询这个 topic 有多少分区
partitions, err := consumer.Partitions(c.Topic)
每个分区开一个 goroutine 来消费
wg.Add(len(partitions))
// 然后每个分区开一个 goroutine 来消费
for _, partitionId := range partitions {
//不开异步会导致一个消费完才会消费另外一个
go c.consumeByPartition(consumer, c.Topic, partitionId, &wg)
}
消费
partitionConsumer, err := consumer.ConsumePartition(topic, partitionId, sarama.OffsetNewest)
// 然后可以通过partitionConsumer.Messages()打印得到的消息
主函数
func main() {
Conn := "127.0.0.1:9092"
topic := "test_log"
var wg sync.WaitGroup
wg.Add(2)
// 消费者
go func() {
defer wg.Done()
// 初始化consumer
var kafkaConsumer = consumer.KafkaConsumer{
Node: []string{Conn},
Topic: topic,
}
// 消费
go kafkaConsumer.Consume()
}()
// 生产者
go func() {
defer wg.Done()
index := 0
for {
// 生产者发送消息
_, err := producer.Send(Conn, topic, fmt.Sprintf("lox_%d", index))
if err != nil {
log.Print("测试失败:" + err.Error())
return
}
index++
time.Sleep(1 * time.Second)
}
}()
wg.Wait()
}
Kafka简单示例
https://zhangzhao219.github.io/2023/03/03/Backend/Kafka/