From 88420d47373cb1bfa9a873dc85982dee76aa7f66 Mon Sep 17 00:00:00 2001 From: Sergei Vizel Date: Mon, 17 Sep 2018 08:28:54 +0300 Subject: [PATCH 1/4] keep connected && transport error handler --- client.go | 5 +++++ transport/transport.go | 2 ++ transport/websocket/websocket.go | 34 +++++++++++++++++++++++++++----- 3 files changed, 36 insertions(+), 5 deletions(-) diff --git a/client.go b/client.go index 75457ea..59d98db 100644 --- a/client.go +++ b/client.go @@ -23,6 +23,7 @@ type client interface { //Unsubscribe(subscription string) error Publish(subscription string, message message.Data) (string, error) OnPublishResponse(subscription string, onMsg func(message *message.Message)) + OnTransportError(onErr func(err error)) } //Option set the Client options, such as Transport, message extensions,etc. @@ -78,6 +79,10 @@ func (c *Client) OnPublishResponse(subscription string, onMsg func(message *mess c.opts.transport.OnPublishResponse(subscription, onMsg) } +func (c *Client) OnTransportError(onErr func(err error)) { + c.opts.transport.OnError(onErr) +} + //Disconnect closes all subscriptions and inform the server to remove any client-related state. //any subsequent method call to the client object will result in undefined behaviour. func (c *Client) Disconnect() error { diff --git a/transport/transport.go b/transport/transport.go index f474b2b..ba11d9b 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -48,6 +48,8 @@ type Transport interface { //ever be triggered //can be used to identify the status of the published request and for example retry failed published requests OnPublishResponse(subscription string, onMsg func(message *message.Message)) + //OnError sets the handler to be triggered if some error appears + OnError(onErr func(err error)) } //MetaMessage are channels commencing with the /meta/ segment ans, are the channels used by the faye protocol itself. diff --git a/transport/websocket/websocket.go b/transport/websocket/websocket.go index 9c71647..271f550 100644 --- a/transport/websocket/websocket.go +++ b/transport/websocket/websocket.go @@ -37,6 +37,7 @@ type Websocket struct { onPubResponseMu sync.Mutex //todo sync.Map onPublishResponse map[string]func(message *message.Message) + onError func(err error) } var _ transport.Transport = (*Websocket)(nil) @@ -52,6 +53,7 @@ func (w *Websocket) Init(endpoint string, options *transport.Options) error { //w.subs = map[string]chan *message.Message{} w.subscriptions = map[string][]*subscription.Subscription{} w.onPublishResponse = map[string]func(message *message.Message){} + w.onError = func(err error){} w.stopCh = make(chan error) w.conn, _, err = websocket.DefaultDialer.Dial(endpoint, options.Headers) if err != nil { @@ -60,17 +62,17 @@ func (w *Websocket) Init(endpoint string, options *transport.Options) error { return nil } -func (w *Websocket) readWorker() error { +func (w *Websocket) readWorker() { for { select { case err := <-w.stopCh: - return err + panic(err) default: } var payload []message.Message err := w.conn.ReadJSON(&payload) if err != nil { - return err + panic(err) } //dispatch msg := &payload[0] @@ -83,6 +85,16 @@ func (w *Websocket) readWorker() error { if transport.IsMetaMessage(msg) { //handle it switch msg.Channel { + case transport.MetaConnect: + m := message.Message{ + Channel: transport.MetaConnect, + ClientId: w.clientID, + ConnectionType: transportName, + Id: w.nextMsgID(), + } + if err = w.sendMessage(&m); err != nil { + panic(err) + } case transport.MetaSubscribe: //handle MetaSubscribe resp w.subscriptionsMu.Lock() @@ -126,7 +138,6 @@ func (w *Websocket) readWorker() error { } } w.subscriptionsMu.Unlock() - } continue @@ -238,7 +249,16 @@ func (w *Websocket) Connect() error { ConnectionType: transportName, Id: w.nextMsgID(), } - go w.readWorker() + + go func () { + defer func() { + if r := recover(); r != nil { + w.onError(fmt.Errorf("%v", r)) + } + }() + w.readWorker() + }() + return w.sendMessage(&m) } @@ -368,6 +388,10 @@ func (w *Websocket) OnPublishResponse(subscription string, onMsg func(message *m w.onPubResponseMu.Unlock() } +func (w *Websocket) OnError(onErr func(err error)) { + w.onError = onErr +} + func (w *Websocket) handleAdvise(m *message.Advise) { //todo actually handle the advice w.advice.Store(m) From f9d710b126d9f40e560938723476b509f2e7fb69 Mon Sep 17 00:00:00 2001 From: Sergei Vizel Date: Tue, 18 Sep 2018 13:51:05 +0300 Subject: [PATCH 2/4] remove tmp logs && debug output fix --- extensions/debug.go | 4 ++-- transport/websocket/websocket.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/extensions/debug.go b/extensions/debug.go index 5041eb1..7dde847 100644 --- a/extensions/debug.go +++ b/extensions/debug.go @@ -8,7 +8,7 @@ import ( ) func debugJson(v interface{}) string { - b, _ := json.MarshalIndent(v, "", " ") + b, _ := json.MarshalIndent(v, "", " ") return string(b) } @@ -20,7 +20,7 @@ type DebugExtension struct { func NewDebugExtension(out io.Writer) *DebugExtension { li := log.New(out, "InMsg", 0) - lo := log.New(out, "outMsg", 0) + lo := log.New(out, "OutMsg", 0) return &DebugExtension{in: li, out: lo} } diff --git a/transport/websocket/websocket.go b/transport/websocket/websocket.go index 271f550..88b5bfa 100644 --- a/transport/websocket/websocket.go +++ b/transport/websocket/websocket.go @@ -315,10 +315,10 @@ func (w *Websocket) Subscribe(channel string) (*subscription.Subscription, error //todo timeout here err := <-subRes if err != nil { - log.Println(err) + //log.Println(err) return nil, err } - log.Println(sub) + //log.Println(sub) return sub, nil } From c45cd2a7b3dff5ea6d1efd5784810e3f07198eed Mon Sep 17 00:00:00 2001 From: Sergei Vizel Date: Thu, 20 Sep 2018 08:48:33 +0300 Subject: [PATCH 3/4] return error instead of panic --- transport/websocket/websocket.go | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/transport/websocket/websocket.go b/transport/websocket/websocket.go index 88b5bfa..9dc49ee 100644 --- a/transport/websocket/websocket.go +++ b/transport/websocket/websocket.go @@ -62,17 +62,17 @@ func (w *Websocket) Init(endpoint string, options *transport.Options) error { return nil } -func (w *Websocket) readWorker() { +func (w *Websocket) readWorker() error { for { select { case err := <-w.stopCh: - panic(err) + return err default: } var payload []message.Message err := w.conn.ReadJSON(&payload) if err != nil { - panic(err) + return err } //dispatch msg := &payload[0] @@ -93,14 +93,14 @@ func (w *Websocket) readWorker() { Id: w.nextMsgID(), } if err = w.sendMessage(&m); err != nil { - panic(err) + return err } case transport.MetaSubscribe: //handle MetaSubscribe resp w.subscriptionsMu.Lock() subscriptions, ok := w.subscriptions[msg.Subscription] if !ok { - panic("BUG: subscription not registered `" + msg.Subscription + "`") + return fmt.Errorf("BUG: subscription not registered `%s`", msg.Subscription) } if !msg.Successful { if msg.GetError() == nil { @@ -251,12 +251,8 @@ func (w *Websocket) Connect() error { } go func () { - defer func() { - if r := recover(); r != nil { - w.onError(fmt.Errorf("%v", r)) - } - }() - w.readWorker() + err := w.readWorker() + w.onError(err) }() return w.sendMessage(&m) From e46ce92706a51b2c404a4b2621425d43289f7f23 Mon Sep 17 00:00:00 2001 From: Sergei Vizel Date: Thu, 20 Sep 2018 09:22:09 +0300 Subject: [PATCH 4/4] stop reconnect, if the server sends advise.reconnect==none --- transport/websocket/websocket.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/transport/websocket/websocket.go b/transport/websocket/websocket.go index 9dc49ee..210f59b 100644 --- a/transport/websocket/websocket.go +++ b/transport/websocket/websocket.go @@ -86,6 +86,10 @@ func (w *Websocket) readWorker() error { //handle it switch msg.Channel { case transport.MetaConnect: + advise := w.advice.Load().(*message.Advise) + if advise.Reconnect == message.ReconnectNone { + return w.Disconnect() + } m := message.Message{ Channel: transport.MetaConnect, ClientId: w.clientID, @@ -251,8 +255,9 @@ func (w *Websocket) Connect() error { } go func () { - err := w.readWorker() - w.onError(err) + if err := w.readWorker(); err != nil { + w.onError(err) + } }() return w.sendMessage(&m)