diff --git a/go/stream.go b/go/stream.go index 6f0661d..e5da804 100644 --- a/go/stream.go +++ b/go/stream.go @@ -81,6 +81,7 @@ type stream struct { streamCtxCancel context.CancelFunc closeError atomic.Value connStatusCallback func(isConneccted bool, host string, origin string) + connMu sync.Mutex waterMarkMu sync.Mutex waterMark map[string]uint64 @@ -137,12 +138,17 @@ func (c *client) newStream(ctx context.Context, httpClient *http.Client, feedIDs return } go s.monitorConn(conn) + s.connMu.Lock() s.conns = append(s.conns, conn) + s.connMu.Unlock() }() continue + } else { + s.connMu.Lock() + s.conns = append(s.conns, conn) + s.connMu.Unlock() + go s.monitorConn(conn) } - go s.monitorConn(conn) - s.conns = append(s.conns, conn) } // Only fail if we couldn't connect to ANY origins @@ -159,7 +165,9 @@ func (c *client) newStream(ctx context.Context, httpClient *http.Client, feedIDs return nil, err } go s.monitorConn(conn) + s.connMu.Lock() s.conns = append(s.conns, conn) + s.connMu.Unlock() s.stats.configuredConnections.Add(1) } @@ -214,12 +222,11 @@ func (s *stream) monitorConn(conn *wsConn) { cancel() // `Add(^uint64(0))` will decrement activeConnections s.stats.activeConnections.Add(^uint64(0)) - if s.connStatusCallback != nil { - go s.connStatusCallback(false, conn.host, conn.origin) - } - // check for stream close conditions before reconnect attempts if ctxErr := s.streamCtx.Err(); ctxErr != nil || s.closed.Load() { + if s.connStatusCallback != nil { + s.connStatusCallback(false, conn.host, conn.origin) + } if ctxErr != nil { s.config.logInfo( "client: stream websocket %s context done: %s", @@ -230,6 +237,10 @@ func (s *stream) monitorConn(conn *wsConn) { return } + if s.connStatusCallback != nil { + go s.connStatusCallback(false, conn.host, conn.origin) + } + // reconnect protocol if s.stats.activeConnections.Load() == 0 { s.stats.fullReconnects.Add(1) @@ -339,9 +350,12 @@ func (s *stream) Close() (err error) { s.closingMutex.Lock() defer s.closingMutex.Unlock() + s.connMu.Lock() for x := 0; x < len(s.conns); x++ { _ = s.conns[x].close() } + s.connMu.Unlock() + close(s.output) // return a pending error if err, ok := s.closeError.Load().(error); ok {