diff --git a/client.go b/client.go index 75457ea..59d98db 100644 --- a/client.go +++ b/client.go @@ -23,6 +23,7 @@ type client interface { //Unsubscribe(subscription string) error Publish(subscription string, message message.Data) (string, error) OnPublishResponse(subscription string, onMsg func(message *message.Message)) + OnTransportError(onErr func(err error)) } //Option set the Client options, such as Transport, message extensions,etc. @@ -78,6 +79,10 @@ func (c *Client) OnPublishResponse(subscription string, onMsg func(message *mess c.opts.transport.OnPublishResponse(subscription, onMsg) } +func (c *Client) OnTransportError(onErr func(err error)) { + c.opts.transport.OnError(onErr) +} + //Disconnect closes all subscriptions and inform the server to remove any client-related state. //any subsequent method call to the client object will result in undefined behaviour. func (c *Client) Disconnect() error { diff --git a/extensions/debug.go b/extensions/debug.go index 5041eb1..7dde847 100644 --- a/extensions/debug.go +++ b/extensions/debug.go @@ -8,7 +8,7 @@ import ( ) func debugJson(v interface{}) string { - b, _ := json.MarshalIndent(v, "", " ") + b, _ := json.MarshalIndent(v, "", " ") return string(b) } @@ -20,7 +20,7 @@ type DebugExtension struct { func NewDebugExtension(out io.Writer) *DebugExtension { li := log.New(out, "InMsg", 0) - lo := log.New(out, "outMsg", 0) + lo := log.New(out, "OutMsg", 0) return &DebugExtension{in: li, out: lo} } diff --git a/transport/transport.go b/transport/transport.go index f474b2b..ba11d9b 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -48,6 +48,8 @@ type Transport interface { //ever be triggered //can be used to identify the status of the published request and for example retry failed published requests OnPublishResponse(subscription string, onMsg func(message *message.Message)) + //OnError sets the handler to be triggered if some error appears + OnError(onErr func(err error)) } //MetaMessage are channels commencing with the /meta/ segment ans, are the channels used by the faye protocol itself. diff --git a/transport/websocket/websocket.go b/transport/websocket/websocket.go index 9c71647..210f59b 100644 --- a/transport/websocket/websocket.go +++ b/transport/websocket/websocket.go @@ -37,6 +37,7 @@ type Websocket struct { onPubResponseMu sync.Mutex //todo sync.Map onPublishResponse map[string]func(message *message.Message) + onError func(err error) } var _ transport.Transport = (*Websocket)(nil) @@ -52,6 +53,7 @@ func (w *Websocket) Init(endpoint string, options *transport.Options) error { //w.subs = map[string]chan *message.Message{} w.subscriptions = map[string][]*subscription.Subscription{} w.onPublishResponse = map[string]func(message *message.Message){} + w.onError = func(err error){} w.stopCh = make(chan error) w.conn, _, err = websocket.DefaultDialer.Dial(endpoint, options.Headers) if err != nil { @@ -83,12 +85,26 @@ func (w *Websocket) readWorker() error { if transport.IsMetaMessage(msg) { //handle it switch msg.Channel { + case transport.MetaConnect: + advise := w.advice.Load().(*message.Advise) + if advise.Reconnect == message.ReconnectNone { + return w.Disconnect() + } + m := message.Message{ + Channel: transport.MetaConnect, + ClientId: w.clientID, + ConnectionType: transportName, + Id: w.nextMsgID(), + } + if err = w.sendMessage(&m); err != nil { + return err + } case transport.MetaSubscribe: //handle MetaSubscribe resp w.subscriptionsMu.Lock() subscriptions, ok := w.subscriptions[msg.Subscription] if !ok { - panic("BUG: subscription not registered `" + msg.Subscription + "`") + return fmt.Errorf("BUG: subscription not registered `%s`", msg.Subscription) } if !msg.Successful { if msg.GetError() == nil { @@ -126,7 +142,6 @@ func (w *Websocket) readWorker() error { } } w.subscriptionsMu.Unlock() - } continue @@ -238,7 +253,13 @@ func (w *Websocket) Connect() error { ConnectionType: transportName, Id: w.nextMsgID(), } - go w.readWorker() + + go func () { + if err := w.readWorker(); err != nil { + w.onError(err) + } + }() + return w.sendMessage(&m) } @@ -295,10 +316,10 @@ func (w *Websocket) Subscribe(channel string) (*subscription.Subscription, error //todo timeout here err := <-subRes if err != nil { - log.Println(err) + //log.Println(err) return nil, err } - log.Println(sub) + //log.Println(sub) return sub, nil } @@ -368,6 +389,10 @@ func (w *Websocket) OnPublishResponse(subscription string, onMsg func(message *m w.onPubResponseMu.Unlock() } +func (w *Websocket) OnError(onErr func(err error)) { + w.onError = onErr +} + func (w *Websocket) handleAdvise(m *message.Advise) { //todo actually handle the advice w.advice.Store(m)