From 161979d56ec46ebafba2f472dec8a1081b1e3c8f Mon Sep 17 00:00:00 2001 From: Nathan Keyes Date: Sun, 23 Jun 2013 15:49:24 -0700 Subject: [PATCH 01/12] add connection pool class --- lib/ok_hbase/no_connections_available.rb | 4 ++ lib/ok_hbase/pool.rb | 81 ++++++++++++++++++++++++ 2 files changed, 85 insertions(+) create mode 100644 lib/ok_hbase/no_connections_available.rb create mode 100644 lib/ok_hbase/pool.rb diff --git a/lib/ok_hbase/no_connections_available.rb b/lib/ok_hbase/no_connections_available.rb new file mode 100644 index 0000000..c5e209c --- /dev/null +++ b/lib/ok_hbase/no_connections_available.rb @@ -0,0 +1,4 @@ +module OkHbase + class NoConnectionsAvailable < RuntimeError + end +end diff --git a/lib/ok_hbase/pool.rb b/lib/ok_hbase/pool.rb new file mode 100644 index 0000000..f1496ae --- /dev/null +++ b/lib/ok_hbase/pool.rb @@ -0,0 +1,81 @@ +require 'thread' +require 'timeout' + +require 'ok_hbase/connection' +require 'ok_hbase/no_connections_available' + + +module OkHbase + class Pool + @_lock + @_connection_queue + + def initialize(size, opts={}) + raise TypeError.new("'size' must be an integer") unless size.is_a? Integer + raise ArgumentError.new("'size' must be > 0") unless size > 0 + + OkHbase.logger.debug("Initializing connection pool with #{size} connections.") + + @_lock = Mutex.new + @_connection_queue = Queue.new + + connection_opts = opts + + connection_opts[:auto_connect] = false + + size.times do + connection = OkHbase::Connection.new(connection_opts) + @_connection_queue << connection + + self.connection {} + + end + end + + def connection(timeout = nil) + connection = Thread.current[:current_connection] + + return_after_use = false + + if connection + return_after_use = true + connection = _acquire_connection(timeout) + @_lock.synchronize do + Thread.current[:current_connection] = connection + end + end + + begin + connection.open() + yield connection + rescue Apache::Hadoop::Hbase::Thrift::IOError, Thrift::TransportException, SocketError=> e + OkHbase.logger.info("Replacing tainted pool connection") + + connection.send(:_refresh_thrift_client) + connection.open + raise e + ensure + if return_after_use + Thread.current.delete[:current_connection] + _return_connection(connection) + end + end + end + + private + + def _acquire_connection(timeout = nil) + begin + Timeout.timeout(timeout) do + return @_connection_queue.deq + end + rescue TimeoutError + raise OkHbase::NoConnectionsAvailable.new("No connection available from pool within specified timeout: #{timeout}") + end + end + + def _return_connection(connection) + @_connection_queue << connection + end + end +end From 471f7e466d76a4d3f1f3aae0dd63c66948017e3b Mon Sep 17 00:00:00 2001 From: Nathan Keyes Date: Sun, 23 Jun 2013 16:02:52 -0700 Subject: [PATCH 02/12] fix connection conditional --- lib/ok_hbase/pool.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/ok_hbase/pool.rb b/lib/ok_hbase/pool.rb index f1496ae..963b44d 100644 --- a/lib/ok_hbase/pool.rb +++ b/lib/ok_hbase/pool.rb @@ -37,7 +37,7 @@ def connection(timeout = nil) return_after_use = false - if connection + unless connection return_after_use = true connection = _acquire_connection(timeout) @_lock.synchronize do From 6e1e438f4d6b20e96307c2e4a9524ef1b3927f75 Mon Sep 17 00:00:00 2001 From: Nathan Keyes Date: Sun, 23 Jun 2013 16:07:45 -0700 Subject: [PATCH 03/12] add comment from HappyBase about making the first connection --- lib/ok_hbase/pool.rb | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/lib/ok_hbase/pool.rb b/lib/ok_hbase/pool.rb index 963b44d..82bcd2c 100644 --- a/lib/ok_hbase/pool.rb +++ b/lib/ok_hbase/pool.rb @@ -26,10 +26,12 @@ def initialize(size, opts={}) size.times do connection = OkHbase::Connection.new(connection_opts) @_connection_queue << connection - - self.connection {} - end + + # The first connection is made immediately so that trivial + # mistakes like unresolvable host names are raised immediately. + # Subsequent connections are connected lazily. + self.connection {} end def connection(timeout = nil) @@ -48,7 +50,7 @@ def connection(timeout = nil) begin connection.open() yield connection - rescue Apache::Hadoop::Hbase::Thrift::IOError, Thrift::TransportException, SocketError=> e + rescue Apache::Hadoop::Hbase::Thrift::IOError, Thrift::TransportException, SocketError => e OkHbase.logger.info("Replacing tainted pool connection") connection.send(:_refresh_thrift_client) From 3340a9ec7e5219b77408206df4d18fd727ce32ab Mon Sep 17 00:00:00 2001 From: Nathan Keyes Date: Sun, 23 Jun 2013 16:42:20 -0700 Subject: [PATCH 04/12] only eagerly load the first connection if opts[:auto_connect] is truthy --- lib/ok_hbase/pool.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/ok_hbase/pool.rb b/lib/ok_hbase/pool.rb index 82bcd2c..c482941 100644 --- a/lib/ok_hbase/pool.rb +++ b/lib/ok_hbase/pool.rb @@ -19,7 +19,7 @@ def initialize(size, opts={}) @_lock = Mutex.new @_connection_queue = Queue.new - connection_opts = opts + connection_opts = opts.dup connection_opts[:auto_connect] = false @@ -31,7 +31,7 @@ def initialize(size, opts={}) # The first connection is made immediately so that trivial # mistakes like unresolvable host names are raised immediately. # Subsequent connections are connected lazily. - self.connection {} + self.connection {} if opts[:auto_connect] end def connection(timeout = nil) From c0aec398a415b2690a72558676916049e131a414 Mon Sep 17 00:00:00 2001 From: Nathan Keyes Date: Sun, 23 Jun 2013 16:47:43 -0700 Subject: [PATCH 05/12] rename current thread connection variable name to be more specific --- lib/ok_hbase/pool.rb | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/ok_hbase/pool.rb b/lib/ok_hbase/pool.rb index c482941..f9a7a0f 100644 --- a/lib/ok_hbase/pool.rb +++ b/lib/ok_hbase/pool.rb @@ -35,7 +35,7 @@ def initialize(size, opts={}) end def connection(timeout = nil) - connection = Thread.current[:current_connection] + connection = Thread.current[:ok_hbase_current_connection] return_after_use = false @@ -43,7 +43,7 @@ def connection(timeout = nil) return_after_use = true connection = _acquire_connection(timeout) @_lock.synchronize do - Thread.current[:current_connection] = connection + Thread.current[:ok_hbase_current_connection] = connection end end @@ -58,7 +58,7 @@ def connection(timeout = nil) raise e ensure if return_after_use - Thread.current.delete[:current_connection] + Thread.current.delete[:ok_hbase_current_connection] _return_connection(connection) end end From 17dcb0115ca128e1e709c6427db1d0515706e4fc Mon Sep 17 00:00:00 2001 From: Nathan Keyes Date: Sun, 23 Jun 2013 16:50:20 -0700 Subject: [PATCH 06/12] remove unneeded mutex/lock --- lib/ok_hbase/pool.rb | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/lib/ok_hbase/pool.rb b/lib/ok_hbase/pool.rb index f9a7a0f..5e1989d 100644 --- a/lib/ok_hbase/pool.rb +++ b/lib/ok_hbase/pool.rb @@ -16,7 +16,6 @@ def initialize(size, opts={}) OkHbase.logger.debug("Initializing connection pool with #{size} connections.") - @_lock = Mutex.new @_connection_queue = Queue.new connection_opts = opts.dup @@ -42,9 +41,7 @@ def connection(timeout = nil) unless connection return_after_use = true connection = _acquire_connection(timeout) - @_lock.synchronize do - Thread.current[:ok_hbase_current_connection] = connection - end + Thread.current[:ok_hbase_current_connection] = connection end begin From c38ea5232b6559a122682c756407dacd6555d276 Mon Sep 17 00:00:00 2001 From: Nathan Keyes Date: Fri, 5 Jul 2013 17:06:22 -0500 Subject: [PATCH 07/12] fix current connection reset --- lib/ok_hbase/pool.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/ok_hbase/pool.rb b/lib/ok_hbase/pool.rb index 5e1989d..9b2e0ca 100644 --- a/lib/ok_hbase/pool.rb +++ b/lib/ok_hbase/pool.rb @@ -55,7 +55,7 @@ def connection(timeout = nil) raise e ensure if return_after_use - Thread.current.delete[:ok_hbase_current_connection] + Thread.current[:ok_hbase_current_connection] = nil _return_connection(connection) end end From faf9b9ad285dd0c752093ee751f1fc6952be3946 Mon Sep 17 00:00:00 2001 From: Nathan Keyes Date: Fri, 5 Jul 2013 17:07:08 -0500 Subject: [PATCH 08/12] require ok_hbase/pool --- lib/ok_hbase.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/ok_hbase.rb b/lib/ok_hbase.rb index e852089..cf73569 100644 --- a/lib/ok_hbase.rb +++ b/lib/ok_hbase.rb @@ -33,6 +33,7 @@ def self.thrift_type_to_dict(obj) require 'ok_hbase/version' require 'ok_hbase/client' require 'ok_hbase/connection' +require 'ok_hbase/pool' require 'ok_hbase/concerns' require 'ok_hbase/table' require 'ok_hbase/row' From 10c9266647eb6efb652226404f56ff834ccfbf58 Mon Sep 17 00:00:00 2001 From: Nathan Keyes Date: Fri, 5 Jul 2013 17:45:37 -0500 Subject: [PATCH 09/12] add OkHbase::Connection#ping? --- lib/ok_hbase/connection.rb | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/lib/ok_hbase/connection.rb b/lib/ok_hbase/connection.rb index 70cc80e..a23e5fe 100644 --- a/lib/ok_hbase/connection.rb +++ b/lib/ok_hbase/connection.rb @@ -64,6 +64,14 @@ def open? @transport && @transport.open? end + def ping? + begin + return open? && tables + rescue + return false + end + end + def close return unless open? @transport.close From cc88378015ad33bdf31ffcc6cea2450b4365c46c Mon Sep 17 00:00:00 2001 From: Nathan Keyes Date: Fri, 5 Jul 2013 17:48:36 -0500 Subject: [PATCH 10/12] handle connection failures better --- lib/ok_hbase/pool.rb | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/lib/ok_hbase/pool.rb b/lib/ok_hbase/pool.rb index 9b2e0ca..c62db19 100644 --- a/lib/ok_hbase/pool.rb +++ b/lib/ok_hbase/pool.rb @@ -46,12 +46,11 @@ def connection(timeout = nil) begin connection.open() + _reset_connection(connection) unless connection.ping? + yield connection rescue Apache::Hadoop::Hbase::Thrift::IOError, Thrift::TransportException, SocketError => e - OkHbase.logger.info("Replacing tainted pool connection") - - connection.send(:_refresh_thrift_client) - connection.open + _reset_connection(connection) raise e ensure if return_after_use @@ -73,6 +72,13 @@ def _acquire_connection(timeout = nil) end end + def _reset_connection(connection) + OkHbase.logger.info("Replacing tainted pool connection") + + connection.send(:_refresh_thrift_client) + connection.open + end + def _return_connection(connection) @_connection_queue << connection end From 4163799bfdac5df9d842a07cb644de0acdf54d06 Mon Sep 17 00:00:00 2001 From: Nathan Keyes Date: Sun, 25 Aug 2013 11:32:18 -0700 Subject: [PATCH 11/12] add Connection#reset --- lib/ok_hbase/connection.rb | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/lib/ok_hbase/connection.rb b/lib/ok_hbase/connection.rb index a23e5fe..7727cd3 100644 --- a/lib/ok_hbase/connection.rb +++ b/lib/ok_hbase/connection.rb @@ -153,6 +153,12 @@ def table_name(name) table_prefix && !name.start_with?(table_prefix) ? [table_prefix, name].join(table_prefix_separator) : name end + def reset + OkHbase.logger.info("resetting connection") + _refresh_thrift_client + open + end + private def _refresh_thrift_client From 51b42ad197cf7e9668fba695ce78a94f5cd8b0d3 Mon Sep 17 00:00:00 2001 From: Nathan Keyes Date: Sun, 25 Aug 2013 11:42:41 -0700 Subject: [PATCH 12/12] track pool connection ids Refactor: connection -> with_connection _acquire_connection -> get_connection _return_connection -> return_connection --- lib/ok_hbase/pool.rb | 56 ++++++++++++++++++++++++-------------------- 1 file changed, 31 insertions(+), 25 deletions(-) diff --git a/lib/ok_hbase/pool.rb b/lib/ok_hbase/pool.rb index c62db19..c745b6b 100644 --- a/lib/ok_hbase/pool.rb +++ b/lib/ok_hbase/pool.rb @@ -9,6 +9,7 @@ module OkHbase class Pool @_lock @_connection_queue + @_connection_ids def initialize(size, opts={}) raise TypeError.new("'size' must be an integer") unless size.is_a? Integer @@ -16,7 +17,9 @@ def initialize(size, opts={}) OkHbase.logger.debug("Initializing connection pool with #{size} connections.") + @_lock = Mutex.new @_connection_queue = Queue.new + @_connection_ids = [] connection_opts = opts.dup @@ -25,61 +28,64 @@ def initialize(size, opts={}) size.times do connection = OkHbase::Connection.new(connection_opts) @_connection_queue << connection + @_connection_ids << connection.object_id end # The first connection is made immediately so that trivial # mistakes like unresolvable host names are raised immediately. # Subsequent connections are connected lazily. - self.connection {} if opts[:auto_connect] + self.with_connection {} if opts[:auto_connect] end - def connection(timeout = nil) + def synchronize(&block) + @_lock.synchronize(&block) + end + + def with_connection(timeout = nil) connection = Thread.current[:ok_hbase_current_connection] return_after_use = false - unless connection - return_after_use = true - connection = _acquire_connection(timeout) - Thread.current[:ok_hbase_current_connection] = connection - end - begin - connection.open() - _reset_connection(connection) unless connection.ping? - + unless connection + return_after_use = true + connection = get_connection(timeout) + Thread.current[:ok_hbase_current_connection] = connection + end yield connection rescue Apache::Hadoop::Hbase::Thrift::IOError, Thrift::TransportException, SocketError => e - _reset_connection(connection) raise e ensure if return_after_use Thread.current[:ok_hbase_current_connection] = nil - _return_connection(connection) + return_connection(connection) end end end - private - - def _acquire_connection(timeout = nil) + def get_connection(timeout = nil) begin - Timeout.timeout(timeout) do - return @_connection_queue.deq + connection = Timeout.timeout(timeout) do + @_connection_queue.deq end rescue TimeoutError raise OkHbase::NoConnectionsAvailable.new("No connection available from pool within specified timeout: #{timeout}") end + begin + connection.open() + connection.reset unless connection.ping? + rescue Apache::Hadoop::Hbase::Thrift::IOError, Thrift::TransportException, SocketError => e + connection.reset + raise e + end + connection end - def _reset_connection(connection) - OkHbase.logger.info("Replacing tainted pool connection") - - connection.send(:_refresh_thrift_client) - connection.open - end + def return_connection(connection) + synchronize do + return unless @_connection_ids.include? connection.object_id + end - def _return_connection(connection) @_connection_queue << connection end end