From c084c269b812b093d473671d62f40c9dfeb0adc4 Mon Sep 17 00:00:00 2001 From: Richard SY Han Date: Mon, 10 Jun 2024 22:58:51 +0900 Subject: [PATCH 1/6] fix: *IMPORTANT: fix possibility of memory leak there's possibility of memory leak when if goroutine exits not normally. --- internal/consumer_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/internal/consumer_test.go b/internal/consumer_test.go index c872cba..b038f71 100644 --- a/internal/consumer_test.go +++ b/internal/consumer_test.go @@ -3,6 +3,9 @@ package internal_test import ( "context" "errors" + "testing" + "time" + "github.com/IBM/sarama" "github.com/stretchr/testify/assert" "github.com/violetpay-org/queue-streamer/common" @@ -136,6 +139,7 @@ func TestStreamConsumer_ConsumeClaim(t *testing.T) { msg.DataChan = make(chan *sarama.ConsumerMessage, 1) go func() { + defer close(msg.DataChan) time.Sleep(1 * time.Second) msg.DataChan <- &sarama.ConsumerMessage{ Topic: "test", @@ -145,7 +149,6 @@ func TestStreamConsumer_ConsumeClaim(t *testing.T) { Offset: 0, } time.Sleep(1 * time.Second) - close(msg.DataChan) }() assert.Equal(t, 0, len(consumer.ProducerPool().Producers())) From aecb74190369a771d27850bd468609ee78b31e9b Mon Sep 17 00:00:00 2001 From: Richard SY Han Date: Mon, 10 Jun 2024 22:59:12 +0900 Subject: [PATCH 2/6] chore: removing unused pkgs --- internal/consumer_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/internal/consumer_test.go b/internal/consumer_test.go index b038f71..b0994bb 100644 --- a/internal/consumer_test.go +++ b/internal/consumer_test.go @@ -10,8 +10,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/violetpay-org/queue-streamer/common" "github.com/violetpay-org/queue-streamer/internal" - "testing" - "time" ) var cbrokers = []string{"localhost:9093"} From 8763ca2cdcac9561f2d86db587863e5abf840f3f Mon Sep 17 00:00:00 2001 From: Richard SY Han Date: Mon, 10 Jun 2024 22:59:36 +0900 Subject: [PATCH 3/6] chore: improve code readability --- internal/consumer.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/internal/consumer.go b/internal/consumer.go index 8bf6707..4566182 100644 --- a/internal/consumer.go +++ b/internal/consumer.go @@ -4,10 +4,11 @@ import ( "context" "errors" "fmt" - "github.com/IBM/sarama" - "github.com/violetpay-org/queue-streamer/common" "sync" "time" + + "github.com/IBM/sarama" + "github.com/violetpay-org/queue-streamer/common" ) var transactionalId int32 = 0 From 78edc630c34b74b5bde291b0e4f924c321831dae Mon Sep 17 00:00:00 2001 From: Richard SY Han Date: Mon, 10 Jun 2024 23:00:40 +0900 Subject: [PATCH 4/6] fix: *IMPORTANT prevent from infinite loop There's possibility of infinite loop in code. Resolved it by defining maxRetries --- internal/consumer.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/internal/consumer.go b/internal/consumer.go index 4566182..5917e6c 100644 --- a/internal/consumer.go +++ b/internal/consumer.go @@ -235,7 +235,9 @@ func (consumer *StreamConsumer) HandleTxnError(producer sarama.AsyncProducer, me func (consumer *StreamConsumer) handleTxnError(producer sarama.AsyncProducer, message *sarama.ConsumerMessage, session sarama.ConsumerGroupSession, err error, defaulthandler func() error) { fmt.Printf("Message consumer: unable to process transaction: %+v", err) - for { + retryCount := 0 + maxRetries := 30 + for retryCount < maxRetries { if producer.TxnStatus()&sarama.ProducerTxnFlagFatalError != 0 { // fatal error. need to recreate producer. fmt.Println("Message consumer: producer is in a fatal state, need to recreate it") @@ -259,4 +261,7 @@ func (consumer *StreamConsumer) handleTxnError(producer sarama.AsyncProducer, me return } } + if retryCount == maxRetries { + fmt.Println("Error: failed to commit transaction after", maxRetries, "retries") + } } From 74d292b4144aa9d89d80a273977fe8142467cc3b Mon Sep 17 00:00:00 2001 From: Richard SY Han Date: Mon, 10 Jun 2024 23:00:57 +0900 Subject: [PATCH 5/6] chore: improved code readability and performance --- internal/utils.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/utils.go b/internal/utils.go index fcb1144..36fd817 100644 --- a/internal/utils.go +++ b/internal/utils.go @@ -15,8 +15,8 @@ func Copy(source interface{}, destin interface{}) { y := reflect.New(starX.Type()) starY := y.Elem() starY.Set(starX) - reflect.ValueOf(destin).Elem().Set(y.Elem()) + dest.Elem().Set(y.Elem()) } else { - reflect.ValueOf(destin).Elem().Set(x) + dest.Elem().Set(x) } } From 6b2f2de342a1c723268a90c63585db38bd71e608 Mon Sep 17 00:00:00 2001 From: Richard SY Han Date: Mon, 10 Jun 2024 23:01:11 +0900 Subject: [PATCH 6/6] chore: improve code readability --- internal/utils_test.go | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/internal/utils_test.go b/internal/utils_test.go index 1f9b923..0d65d9a 100644 --- a/internal/utils_test.go +++ b/internal/utils_test.go @@ -1,19 +1,21 @@ package internal_test import ( + "testing" + "github.com/stretchr/testify/assert" "github.com/violetpay-org/queue-streamer/internal" - "testing" ) +type TestStruct struct { + Name string + Age int +} + func TestCopy(t *testing.T) { t.Parallel() t.Run("Copy pointer", func(t *testing.T) { - type TestStruct struct { - Name string - Age int - } source := &TestStruct{Name: "John", Age: 25} var destin TestStruct @@ -25,10 +27,6 @@ func TestCopy(t *testing.T) { }) t.Run("Copy value", func(t *testing.T) { - type TestStruct struct { - Name string - Age int - } source := TestStruct{Name: "John", Age: 25} var destin TestStruct