Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 20 additions & 6 deletions go/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ type stream struct {
streamCtxCancel context.CancelFunc
closeError atomic.Value
connStatusCallback func(isConneccted bool, host string, origin string)
connMu sync.Mutex

waterMarkMu sync.Mutex
waterMark map[string]uint64
Expand Down Expand Up @@ -137,12 +138,17 @@ func (c *client) newStream(ctx context.Context, httpClient *http.Client, feedIDs
return
}
go s.monitorConn(conn)
s.connMu.Lock()
s.conns = append(s.conns, conn)
s.connMu.Unlock()
}()
continue
} else {
s.connMu.Lock()
s.conns = append(s.conns, conn)
s.connMu.Unlock()
go s.monitorConn(conn)
}
go s.monitorConn(conn)
s.conns = append(s.conns, conn)
}

// Only fail if we couldn't connect to ANY origins
Expand All @@ -159,7 +165,9 @@ func (c *client) newStream(ctx context.Context, httpClient *http.Client, feedIDs
return nil, err
}
go s.monitorConn(conn)
s.connMu.Lock()
s.conns = append(s.conns, conn)
s.connMu.Unlock()
s.stats.configuredConnections.Add(1)
}

Expand Down Expand Up @@ -214,12 +222,11 @@ func (s *stream) monitorConn(conn *wsConn) {
cancel()
// `Add(^uint64(0))` will decrement activeConnections
s.stats.activeConnections.Add(^uint64(0))
if s.connStatusCallback != nil {
go s.connStatusCallback(false, conn.host, conn.origin)
}

// check for stream close conditions before reconnect attempts
if ctxErr := s.streamCtx.Err(); ctxErr != nil || s.closed.Load() {
if s.connStatusCallback != nil {
s.connStatusCallback(false, conn.host, conn.origin)
}
if ctxErr != nil {
s.config.logInfo(
"client: stream websocket %s context done: %s",
Expand All @@ -230,6 +237,10 @@ func (s *stream) monitorConn(conn *wsConn) {
return
}

if s.connStatusCallback != nil {
go s.connStatusCallback(false, conn.host, conn.origin)
}

// reconnect protocol
if s.stats.activeConnections.Load() == 0 {
s.stats.fullReconnects.Add(1)
Expand Down Expand Up @@ -339,9 +350,12 @@ func (s *stream) Close() (err error) {
s.closingMutex.Lock()
defer s.closingMutex.Unlock()

s.connMu.Lock()
for x := 0; x < len(s.conns); x++ {
_ = s.conns[x].close()
}
s.connMu.Unlock()

close(s.output)
// return a pending error
if err, ok := s.closeError.Load().(error); ok {
Expand Down