Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 20 additions & 6 deletions go/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ type stream struct {
streamCtxCancel context.CancelFunc
closeError atomic.Value
connStatusCallback func(isConneccted bool, host string, origin string)
connMu sync.Mutex
Copy link
Collaborator

@akuzni2 akuzni2 Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we add this could we remove closingMutex?

closingMutex also guards the connection and we can likely just reuse connMu in it's place


waterMarkMu sync.Mutex
waterMark map[string]uint64
Expand Down Expand Up @@ -137,12 +138,17 @@ func (c *client) newStream(ctx context.Context, httpClient *http.Client, feedIDs
return
}
go s.monitorConn(conn)
s.connMu.Lock()
s.conns = append(s.conns, conn)
s.connMu.Unlock()
}()
continue
} else {
s.connMu.Lock()
s.conns = append(s.conns, conn)
s.connMu.Unlock()
go s.monitorConn(conn)
}
go s.monitorConn(conn)
s.conns = append(s.conns, conn)
}

// Only fail if we couldn't connect to ANY origins
Expand All @@ -159,7 +165,9 @@ func (c *client) newStream(ctx context.Context, httpClient *http.Client, feedIDs
return nil, err
}
go s.monitorConn(conn)
s.connMu.Lock()
s.conns = append(s.conns, conn)
s.connMu.Unlock()
s.stats.configuredConnections.Add(1)
}

Expand Down Expand Up @@ -214,12 +222,11 @@ func (s *stream) monitorConn(conn *wsConn) {
cancel()
// `Add(^uint64(0))` will decrement activeConnections
s.stats.activeConnections.Add(^uint64(0))
if s.connStatusCallback != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why move this one? Its nice to have 1 call site. In the replacing changes we have 1 call that's synchronous and another one that's also async.

go s.connStatusCallback(false, conn.host, conn.origin)
}

// check for stream close conditions before reconnect attempts
if ctxErr := s.streamCtx.Err(); ctxErr != nil || s.closed.Load() {
if s.connStatusCallback != nil {
s.connStatusCallback(false, conn.host, conn.origin)
}
if ctxErr != nil {
s.config.logInfo(
"client: stream websocket %s context done: %s",
Expand All @@ -230,6 +237,10 @@ func (s *stream) monitorConn(conn *wsConn) {
return
}

if s.connStatusCallback != nil {
go s.connStatusCallback(false, conn.host, conn.origin)
}

// reconnect protocol
if s.stats.activeConnections.Load() == 0 {
s.stats.fullReconnects.Add(1)
Expand Down Expand Up @@ -339,9 +350,12 @@ func (s *stream) Close() (err error) {
s.closingMutex.Lock()
defer s.closingMutex.Unlock()

s.connMu.Lock()
for x := 0; x < len(s.conns); x++ {
_ = s.conns[x].close()
}
s.connMu.Unlock()

close(s.output)
// return a pending error
if err, ok := s.closeError.Load().(error); ok {
Expand Down