Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/ok_hbase.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def self.thrift_type_to_dict(obj)
require 'ok_hbase/version'
require 'ok_hbase/client'
require 'ok_hbase/connection'
require 'ok_hbase/pool'
require 'ok_hbase/concerns'
require 'ok_hbase/table'
require 'ok_hbase/row'
Expand Down
14 changes: 14 additions & 0 deletions lib/ok_hbase/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@ def open?
@transport && @transport.open?
end

def ping?
begin
return open? && tables
rescue
return false
end
end

def close
return unless open?
@transport.close
Expand Down Expand Up @@ -143,6 +151,12 @@ def table_name(name)
table_prefix && !name.start_with?(table_prefix) ? [table_prefix, name].join(table_prefix_separator) : name
end

def reset
OkHbase.logger.info("resetting connection")
_refresh_thrift_client
open
end

private

def _refresh_thrift_client
Expand Down
4 changes: 4 additions & 0 deletions lib/ok_hbase/no_connections_available.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
module OkHbase
class NoConnectionsAvailable < RuntimeError
end
end
92 changes: 92 additions & 0 deletions lib/ok_hbase/pool.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
require 'thread'
require 'timeout'

require 'ok_hbase/connection'
require 'ok_hbase/no_connections_available'


module OkHbase
class Pool
@_lock
@_connection_queue
@_connection_ids

def initialize(size, opts={})
raise TypeError.new("'size' must be an integer") unless size.is_a? Integer
raise ArgumentError.new("'size' must be > 0") unless size > 0

OkHbase.logger.debug("Initializing connection pool with #{size} connections.")

@_lock = Mutex.new
@_connection_queue = Queue.new
@_connection_ids = []

connection_opts = opts.dup

connection_opts[:auto_connect] = false

size.times do
connection = OkHbase::Connection.new(connection_opts)
@_connection_queue << connection
@_connection_ids << connection.object_id
end

# The first connection is made immediately so that trivial
# mistakes like unresolvable host names are raised immediately.
# Subsequent connections are connected lazily.
self.with_connection {} if opts[:auto_connect]
end

def synchronize(&block)
@_lock.synchronize(&block)
end

def with_connection(timeout = nil)
connection = Thread.current[:ok_hbase_current_connection]

return_after_use = false

begin
unless connection
return_after_use = true
connection = get_connection(timeout)
Thread.current[:ok_hbase_current_connection] = connection
end
yield connection
rescue Apache::Hadoop::Hbase::Thrift::IOError, Thrift::TransportException, SocketError => e
raise e
ensure
if return_after_use
Thread.current[:ok_hbase_current_connection] = nil
return_connection(connection)
end
end
end

def get_connection(timeout = nil)
begin
connection = Timeout.timeout(timeout) do
@_connection_queue.deq
end
rescue TimeoutError
raise OkHbase::NoConnectionsAvailable.new("No connection available from pool within specified timeout: #{timeout}")
end
begin
connection.open()
connection.reset unless connection.ping?
rescue Apache::Hadoop::Hbase::Thrift::IOError, Thrift::TransportException, SocketError => e
connection.reset
raise e
end
connection
end

def return_connection(connection)
synchronize do
return unless @_connection_ids.include? connection.object_id
end

@_connection_queue << connection

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you checked whether enqueueing an element like this is thread safe? Or might you need to wrap that in a synchronize call?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the Queue class in ruby is meant specifically for inter-thread usage: http://www.ruby-doc.org/stdlib-1.9.3/libdoc/thread/rdoc/Queue.html

end
end
end