Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions lib/sonic/channels/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,23 @@ def help(manual)
end

def quit
execute('QUIT')
if connection.connected?
execute('QUIT')
connection.disconnect
return true
end

return false
end

def close
connection.disconnect
end

def connected?
connection.connected?
end

private

def execute(*args)
Expand All @@ -33,7 +46,8 @@ def normalize(value)
end

def sanitize(value)
value.gsub('"', '\\"').gsub(/[\r\n]+/, ' ')
# remove backslashes entirely
value.gsub(/\\/, "").gsub('"', '\\"').gsub(/[\r\n]+/, '\\n')
end

def quote(value)
Expand All @@ -46,7 +60,7 @@ def type_cast_response(value)
elsif value.start_with?('RESULT ')
value.split(' ').last.to_i
elsif value.start_with?('EVENT ')
value.split(' ')[3..-1].join(' ')
value.split(' ')[3..-1]
else
value
end
Expand Down
4 changes: 4 additions & 0 deletions lib/sonic/channels/control.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ class Control < Base
def trigger(action)
execute('TRIGGER', action)
end

def info
execute('INFO')
end
end
end
end
4 changes: 2 additions & 2 deletions lib/sonic/channels/ingest.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ module Sonic
module Channels
class Ingest < Base
def push(collection, bucket, object, text, lang = nil)
arr = [collection, bucket, object, quote(text)]
arr = [collection, bucket, object, normalize(text)]
arr << "LANG(#{lang})" if lang

execute('PUSH', *arr)
end

def pop(collection, bucket, object, text)
execute('POP', collection, bucket, object, quote(text))
execute('POP', collection, bucket, object, normalize(text))
end

def count(collection, bucket = nil, object = nil)
Expand Down
14 changes: 12 additions & 2 deletions lib/sonic/channels/search.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ module Sonic
module Channels
class Search < Base
def query(collection, bucket, terms, limit = nil, offset = nil, lang = nil) # rubocop:disable Metrics/ParameterLists, Metrics/LineLength
arr = [collection, bucket, quote(terms)]
arr = [collection, bucket, normalize(terms)]
arr << "LIMIT(#{limit})" if limit
arr << "OFFSET(#{offset})" if offset
arr << "LANG(#{lang})" if lang
Expand All @@ -13,13 +13,23 @@ def query(collection, bucket, terms, limit = nil, offset = nil, lang = nil) # ru
end

def suggest(collection, bucket, word, limit = nil)
arr = [collection, bucket, quote(word)]
arr = [collection, bucket, normalize(word)]
arr << "LIMIT(#{limit})" if limit

execute('SUGGEST', *arr) do
connection.read # ...
end
end

def list(collection, bucket, limit = nil, offset = nil)
arr = [collection, bucket]
arr << "LIMIT(#{limit})" if limit
arr << "OFFSET(#{offset})" if offset

execute('LIST', *arr) do
connection.read # ...
end
end
end
end
end
39 changes: 33 additions & 6 deletions lib/sonic/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,57 @@ def initialize(host, port, channel_type, password = nil)
end

def connect
read # ...
socket.gets # ...
write(['START', @channel_type, @password].compact.join(' '))
read.start_with?('STARTED ')
end

def disconnect
socket.close
socket&.close
socket = nil
end

def connected?
!socket.nil?
end

def read
data = socket.gets.chomp
raise ServerError, data if data.start_with?('ERR ')
data = socket.gets&.chomp

if data.nil?
# connection was dropped from timeout
disconnect
raise ServerError, "Connection expired. Please reconnect."
end

raise ServerError, "#{data.force_encoding('UTF-8')} (#{@last_write})" if data.start_with?('ENDED ')
raise ServerError, "#{data.force_encoding('UTF-8')} (#{@last_write})" if data.start_with?('ERR ')

data
end

def write(data)
socket.puts(data)
@last_write = data

begin
socket.puts(data)
rescue Errno::EPIPE => error
disconnect
raise ServerError, "Connection expired. Please reconnect.", error.backtrace
end
end


private

def socket
@socket ||= TCPSocket.open(@host, @port)
@socket ||= begin
socket = TCPSocket.open(@host, @port)
# disables Nagle's Algorithm, prevents multiple round trips with MULTI
socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
socket
end
end

end
end