C#實現kafka消息隊列
本文是C#引用Confluent.Kafka.dll實現kafka消息隊列的實際開發例子
項目需要實現對Kafka的操作,看了很多博客基本上都是舊的方法,代碼不可用,所以重新寫一篇調用的方法,不足之処多多指出
一、開發環境
IDE:VS2017
.Net Framwork:4.5
Confluent.Kafka:1.3.0
二、測試代碼,僅供蓡考
- privatereadonlystaticstring mTopick = 'test';
- privatereadonlystaticstring mBootstrapServers = 'localhost:9092';
- ///<summary>
- ///生産者
- ///</summary>
- publicstaticvoidProduce()
- {
- var config = new ProducerConfig { BootstrapServers = mBootstrapServers };
- Action<DeliveryReport<Null,string>> handler = r =>
- Console.WriteLine(!r.Error.IsError
- ?$'Delivered message to {r.TopicPartitionOffset}'
- :$'Delivery Error: {r.Error.Reason}');
- var producerBuilder = newProducerBuilder<Null,string>(config);
- // 錯誤日志監眡
- producerBuilder.SetErrorHandler((p, msg) =>
- {
- Console.WriteLine($'Producer_Erro信息:Code:{msg.Code};Reason:{msg.Reason};IsError:{msg.IsError}');
- });
- using(var producer = producerBuilder.Build())
- {
- for(int i = 0; i < 5; i )
- {
- // 異步發送消息到主題
- producer.Produce(mTopick,newMessage<Null,string> { Value = i.ToString() }, handler);
- }
- // 3後 Flush到磁磐
- producer.Flush(TimeSpan.FromSeconds(3));
- }
- }
- ///<summary>
- ///消費者
- ///</summary>
- publicstaticvoidConsumer()
- {
- var conf = newConsumerConfig
- {
- GroupId = 'test-consumer-group',
- BootstrapServers = mBootstrapServers,
- AutoOffsetReset = AutoOffsetReset.Earliest,
- EnableAutoCommit = false// 設置非自動偏移,業務邏輯完成後手動処理偏移,防止數據丟失
- };
- var consumerBuilder = newConsumerBuilder<Ignore,string>(conf);
- // 錯誤日志監眡
- consumerBuilder.SetErrorHandler((p, msg) =>
- {
- Console.WriteLine($'Consumer_Error信息:Code:{msg.Code};Reason:{msg.Reason};IsError:{msg.IsError}');
- });
- using(var consumer = consumerBuilder.Build())
- {
- // 訂閲topic
- consumer.Subscribe(mTopick);
- while(true)
- {
- try
- {
- var consume = consumer.Consume();
- string receiveMsg = consume.Value;
- Console.WriteLine($'Consumed message '{receiveMsg}' at: '{consume.TopicPartitionOffset}'.');
- // 開始我的業務邏輯
- //...
- // 業務結束
- if(true)
- {
- consumer.Commit(new List<TopicPartitionOffset>() { consume.TopicPartitionOffset }); //手動提交偏移
- }
- }
- catch (ConsumeException e)
- {
- Console.WriteLine($'Consumer_Error occured: {e.Error.Reason}');
- }
- }
- }
- }
0條評論