Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type client interface {
//Unsubscribe(subscription string) error
Publish(subscription string, message message.Data) (string, error)
OnPublishResponse(subscription string, onMsg func(message *message.Message))
OnTransportError(onErr func(err error))
}

//Option set the Client options, such as Transport, message extensions,etc.
Expand Down Expand Up @@ -78,6 +79,10 @@ func (c *Client) OnPublishResponse(subscription string, onMsg func(message *mess
c.opts.transport.OnPublishResponse(subscription, onMsg)
}

func (c *Client) OnTransportError(onErr func(err error)) {
c.opts.transport.OnError(onErr)
}

//Disconnect closes all subscriptions and inform the server to remove any client-related state.
//any subsequent method call to the client object will result in undefined behaviour.
func (c *Client) Disconnect() error {
Expand Down
4 changes: 2 additions & 2 deletions extensions/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
)

func debugJson(v interface{}) string {
b, _ := json.MarshalIndent(v, "", " ")
b, _ := json.MarshalIndent(v, "", " ")
return string(b)
}

Expand All @@ -20,7 +20,7 @@ type DebugExtension struct {

func NewDebugExtension(out io.Writer) *DebugExtension {
li := log.New(out, "InMsg", 0)
lo := log.New(out, "outMsg", 0)
lo := log.New(out, "OutMsg", 0)
return &DebugExtension{in: li, out: lo}
}

Expand Down
2 changes: 2 additions & 0 deletions transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ type Transport interface {
//ever be triggered
//can be used to identify the status of the published request and for example retry failed published requests
OnPublishResponse(subscription string, onMsg func(message *message.Message))
//OnError sets the handler to be triggered if some error appears
OnError(onErr func(err error))
}

//MetaMessage are channels commencing with the /meta/ segment ans, are the channels used by the faye protocol itself.
Expand Down
35 changes: 30 additions & 5 deletions transport/websocket/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type Websocket struct {

onPubResponseMu sync.Mutex //todo sync.Map
onPublishResponse map[string]func(message *message.Message)
onError func(err error)
}

var _ transport.Transport = (*Websocket)(nil)
Expand All @@ -52,6 +53,7 @@ func (w *Websocket) Init(endpoint string, options *transport.Options) error {
//w.subs = map[string]chan *message.Message{}
w.subscriptions = map[string][]*subscription.Subscription{}
w.onPublishResponse = map[string]func(message *message.Message){}
w.onError = func(err error){}
w.stopCh = make(chan error)
w.conn, _, err = websocket.DefaultDialer.Dial(endpoint, options.Headers)
if err != nil {
Expand Down Expand Up @@ -83,12 +85,26 @@ func (w *Websocket) readWorker() error {
if transport.IsMetaMessage(msg) {
//handle it
switch msg.Channel {
case transport.MetaConnect:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is going to be handled in the upcoming commits as well, we shouldn't reconnect always.
If the server sends advise.reconnect==none we should stop

advise := w.advice.Load().(*message.Advise)
if advise.Reconnect == message.ReconnectNone {
return w.Disconnect()
}
m := message.Message{
Channel: transport.MetaConnect,
ClientId: w.clientID,
ConnectionType: transportName,
Id: w.nextMsgID(),
}
if err = w.sendMessage(&m); err != nil {
return err
}
case transport.MetaSubscribe:
//handle MetaSubscribe resp
w.subscriptionsMu.Lock()
subscriptions, ok := w.subscriptions[msg.Subscription]
if !ok {
panic("BUG: subscription not registered `" + msg.Subscription + "`")
return fmt.Errorf("BUG: subscription not registered `%s`", msg.Subscription)
}
if !msg.Successful {
if msg.GetError() == nil {
Expand Down Expand Up @@ -126,7 +142,6 @@ func (w *Websocket) readWorker() error {
}
}
w.subscriptionsMu.Unlock()

}

continue
Expand Down Expand Up @@ -238,7 +253,13 @@ func (w *Websocket) Connect() error {
ConnectionType: transportName,
Id: w.nextMsgID(),
}
go w.readWorker()

go func () {
if err := w.readWorker(); err != nil {
w.onError(err)
}
}()

return w.sendMessage(&m)
}

Expand Down Expand Up @@ -295,10 +316,10 @@ func (w *Websocket) Subscribe(channel string) (*subscription.Subscription, error
//todo timeout here
err := <-subRes
if err != nil {
log.Println(err)
//log.Println(err)
return nil, err
}
log.Println(sub)
//log.Println(sub)
return sub, nil
}

Expand Down Expand Up @@ -368,6 +389,10 @@ func (w *Websocket) OnPublishResponse(subscription string, onMsg func(message *m
w.onPubResponseMu.Unlock()
}

func (w *Websocket) OnError(onErr func(err error)) {
w.onError = onErr
}

func (w *Websocket) handleAdvise(m *message.Advise) {
//todo actually handle the advice
w.advice.Store(m)
Expand Down