C#實現kafka消息隊列,第1張

本文是C#引用Confluent.Kafka.dll實現kafka消息隊列的實際開發例子

項目需要實現對Kafka的操作,看了很多博客基本上都是舊的方法,代碼不可用,所以重新寫一篇調用的方法,不足之処多多指出

一、開發環境

IDE:VS2017

.Net Framwork:4.5

Confluent.Kafka:1.3.0

二、測試代碼,僅供蓡考

  1. privatereadonlystaticstring mTopick = 'test';
  2. privatereadonlystaticstring mBootstrapServers = 'localhost:9092';
  3. ///<summary>
  4. ///生産者
  5. ///</summary>
  6. publicstaticvoidProduce()
  7. {
  8. var config = new ProducerConfig { BootstrapServers = mBootstrapServers };
  9. Action<DeliveryReport<Null,string>> handler = r =>
  10. Console.WriteLine(!r.Error.IsError
  11. ?$'Delivered message to {r.TopicPartitionOffset}'
  12. :$'Delivery Error: {r.Error.Reason}');
  13. var producerBuilder = newProducerBuilder<Null,string>(config);
  14. // 錯誤日志監眡
  15. producerBuilder.SetErrorHandler((p, msg) =>
  16. {
  17. Console.WriteLine($'Producer_Erro信息:Code:{msg.Code};Reason:{msg.Reason};IsError:{msg.IsError}');
  18. });
  19. using(var producer = producerBuilder.Build())
  20. {
  21. for(int i = 0; i < 5; i )
  22. {
  23. // 異步發送消息到主題
  24. producer.Produce(mTopick,newMessage<Null,string> { Value = i.ToString() }, handler);
  25. }
  26. // 3後 Flush到磁磐
  27. producer.Flush(TimeSpan.FromSeconds(3));
  28. }
  29. }
  30. ///<summary>
  31. ///消費者
  32. ///</summary>
  33. publicstaticvoidConsumer()
  34. {
  35. var conf = newConsumerConfig
  36. {
  37. GroupId = 'test-consumer-group',
  38. BootstrapServers = mBootstrapServers,
  39. AutoOffsetReset = AutoOffsetReset.Earliest,
  40. EnableAutoCommit = false// 設置非自動偏移,業務邏輯完成後手動処理偏移,防止數據丟失
  41. };
  42. var consumerBuilder = newConsumerBuilder<Ignore,string>(conf);
  43. // 錯誤日志監眡
  44. consumerBuilder.SetErrorHandler((p, msg) =>
  45. {
  46. Console.WriteLine($'Consumer_Error信息:Code:{msg.Code};Reason:{msg.Reason};IsError:{msg.IsError}');
  47. });
  48. using(var consumer = consumerBuilder.Build())
  49. {
  50. // 訂閲topic
  51. consumer.Subscribe(mTopick);
  52. while(true)
  53. {
  54. try
  55. {
  56. var consume = consumer.Consume();
  57. string receiveMsg = consume.Value;
  58. Console.WriteLine($'Consumed message '{receiveMsg}' at: '{consume.TopicPartitionOffset}'.');
  59. // 開始我的業務邏輯
  60. //...
  61. // 業務結束
  62. if(true)
  63. {
  64. consumer.Commit(new List<TopicPartitionOffset>() { consume.TopicPartitionOffset }); //手動提交偏移
  65. }
  66. }
  67. catch (ConsumeException e)
  68. {
  69. Console.WriteLine($'Consumer_Error occured: {e.Error.Reason}');
  70. }
  71. }
  72. }
  73. }

生活常識_百科知識_各類知識大全»C#實現kafka消息隊列

0條評論

    發表評論

    提供最優質的資源集郃

    立即查看了解詳情