There is Kafka Topic called "WebMessages" on 2 partions.

We have two consumer group in same server but in different site on IIS.

One of the consumer cannot receive messages. The other one missed most of the messages.

When I write simple consumer on my local computer, I also missed some messages. Any idea what's going wrong ?

Here is the producer code :

_producerConfig = new ProducerConfig {
                    BootstrapServers = _addressWithPort,
                    Acks = Acks.All
                };

using (var p = new ProducerBuilder<string, string>(_producerConfig).Build())
                {
                    p.ProduceAsync(_topicName, new Message<string, string>
                    {
                        Key = ldtoKafkaMessage.Key,
                        Value = ldtoKafkaMessage.Message
                    }).ContinueWith(task =>
                    {
                        if (task.IsFaulted)
                        {
                            TraceController.TraceError(Common.Enums.TraceEventCategories.X, "Key|Message", ldtoKafkaMessage.Key + " " + ldtoKafkaMessage.Message + " " + task.Exception.Message + " " + task.Exception.InnerException+ " " + task.Exception.StackTrace);
                        }
                        else
                        {
                            TraceController.TraceInformation(Common.Enums.TraceEventCategories.X, "Key|Message|Result", ldtoKafkaMessage.Key + " " + ldtoKafkaMessage.Message + " " + "Success");
                        }
                    });
                }

So I make sure that I sent messages to producer.

Here is the consumer code.

     _consumerConfig = new ConsumerConfig
        {
            BootstrapServers = _addressWithPort,
            AutoOffsetReset = AutoOffsetReset.Earliest,
            GroupId = consumerGroupId
        };

  using (var c = new ConsumerBuilder<string, string>(_consumerConfig).Build())
            {
                c.Subscribe(_topicName);

                CancellationTokenSource cts = new CancellationTokenSource();
                try
                {
                    while (!cts.IsCancellationRequested)
                    {
                        try
                        {
                            var cr = c.Consume(cts.Token);
                            LdtoKafkaMessage.Key = cr.Key;
                            LdtoKafkaMessage.Message = cr.Value;
                            this.OnMessageChanged();
                        }
                        catch (ConsumeException e)
                        {

                            Console.WriteLine($"Error occured: {e.Error.Reason}");
                        }
                    }
                }
                catch (OperationCanceledException)
                {
                    // Ensure the consumer leaves the group cleanly and final offsets are committed.
                    c.Close();
                }
            }

Some of your messages might not be produced, because you dispose producer and don't wait for it to finish. You can ensure that messages are delivered by using await keyword on ProduceAsync method or calling Flush() before disposing the producer.


要缩小问题范围,您应该首先检查是否所有消息都写入您的主题。也许是制作者的错?您可以从kafka.apache.org/downloads下载 kafka 工具并用于./kafka-consumer-groups.bat --bootstrap-server BOOTSTRAP_SERVER --describe --group CONSUMER_GROUP检查偏移量

我们深入研究了这个主题。看起来丢失的消息也不在主题中。问题看起来与生产者有关。谢谢@rytisk

相关:stackoverflow.com/questions/63950185 /...