diff --git a/internal/consumer.go b/internal/consumer.go index 8bf6707..5917e6c 100644 --- a/internal/consumer.go +++ b/internal/consumer.go @@ -4,10 +4,11 @@ import ( "context" "errors" "fmt" - "github.com/IBM/sarama" - "github.com/violetpay-org/queue-streamer/common" "sync" "time" + + "github.com/IBM/sarama" + "github.com/violetpay-org/queue-streamer/common" ) var transactionalId int32 = 0 @@ -234,7 +235,9 @@ func (consumer *StreamConsumer) HandleTxnError(producer sarama.AsyncProducer, me func (consumer *StreamConsumer) handleTxnError(producer sarama.AsyncProducer, message *sarama.ConsumerMessage, session sarama.ConsumerGroupSession, err error, defaulthandler func() error) { fmt.Printf("Message consumer: unable to process transaction: %+v", err) - for { + retryCount := 0 + maxRetries := 30 + for retryCount < maxRetries { if producer.TxnStatus()&sarama.ProducerTxnFlagFatalError != 0 { // fatal error. need to recreate producer. fmt.Println("Message consumer: producer is in a fatal state, need to recreate it") @@ -258,4 +261,7 @@ func (consumer *StreamConsumer) handleTxnError(producer sarama.AsyncProducer, me return } } + if retryCount == maxRetries { + fmt.Println("Error: failed to commit transaction after", maxRetries, "retries") + } } diff --git a/internal/consumer_test.go b/internal/consumer_test.go index c872cba..b0994bb 100644 --- a/internal/consumer_test.go +++ b/internal/consumer_test.go @@ -3,12 +3,13 @@ package internal_test import ( "context" "errors" + "testing" + "time" + "github.com/IBM/sarama" "github.com/stretchr/testify/assert" "github.com/violetpay-org/queue-streamer/common" "github.com/violetpay-org/queue-streamer/internal" - "testing" - "time" ) var cbrokers = []string{"localhost:9093"} @@ -136,6 +137,7 @@ func TestStreamConsumer_ConsumeClaim(t *testing.T) { msg.DataChan = make(chan *sarama.ConsumerMessage, 1) go func() { + defer close(msg.DataChan) time.Sleep(1 * time.Second) msg.DataChan <- &sarama.ConsumerMessage{ Topic: "test", @@ -145,7 +147,6 @@ func TestStreamConsumer_ConsumeClaim(t *testing.T) { Offset: 0, } time.Sleep(1 * time.Second) - close(msg.DataChan) }() assert.Equal(t, 0, len(consumer.ProducerPool().Producers())) diff --git a/internal/utils.go b/internal/utils.go index fcb1144..36fd817 100644 --- a/internal/utils.go +++ b/internal/utils.go @@ -15,8 +15,8 @@ func Copy(source interface{}, destin interface{}) { y := reflect.New(starX.Type()) starY := y.Elem() starY.Set(starX) - reflect.ValueOf(destin).Elem().Set(y.Elem()) + dest.Elem().Set(y.Elem()) } else { - reflect.ValueOf(destin).Elem().Set(x) + dest.Elem().Set(x) } } diff --git a/internal/utils_test.go b/internal/utils_test.go index 1f9b923..0d65d9a 100644 --- a/internal/utils_test.go +++ b/internal/utils_test.go @@ -1,19 +1,21 @@ package internal_test import ( + "testing" + "github.com/stretchr/testify/assert" "github.com/violetpay-org/queue-streamer/internal" - "testing" ) +type TestStruct struct { + Name string + Age int +} + func TestCopy(t *testing.T) { t.Parallel() t.Run("Copy pointer", func(t *testing.T) { - type TestStruct struct { - Name string - Age int - } source := &TestStruct{Name: "John", Age: 25} var destin TestStruct @@ -25,10 +27,6 @@ func TestCopy(t *testing.T) { }) t.Run("Copy value", func(t *testing.T) { - type TestStruct struct { - Name string - Age int - } source := TestStruct{Name: "John", Age: 25} var destin TestStruct