diff --git a/lib/druid/client.rb b/lib/druid/client.rb index e9660c5..8d5c33b 100644 --- a/lib/druid/client.rb +++ b/lib/druid/client.rb @@ -41,6 +41,49 @@ def query(id, &block) send query end + def select(id, from, to = Time.now, granularity = "all", page_size = 10000) + uri = data_source_uri(id) + raise "data source #{id} (currently) not available" unless uri + + select_query = Query.new(id, self).query_type(:select) + select_query.interval(from, to) + select_query.granularity(granularity) + + select_query.properties[:pagingSpec] = { + pagingIdentifiers: {}, + threshold: page_size + } + + done = false + begin + req = Net::HTTP::Post.new(uri.path, {'Content-Type' =>'application/json'}) + req.body = select_query.to_json + + response = Net::HTTP.new(uri.host, uri.port).start do |http| + http.read_timeout = TIMEOUT + http.request(req) + end + + if response.code == "200" + page = JSON.parse(response.body)[0]['result'] + + # take the cursor and update the select_query for next round + # please don't ever call this a "REST API" + cursor = page['pagingIdentifiers'] + cursor.each { |segment, position| cursor[segment] = position + 1} + select_query.properties[:pagingSpec][:pagingIdentifiers] = cursor + + page['events'].each do |row| + yield row['event'], row['offset'], row['segmentId'] + end + + done = page['events'].size < page_size + else + raise "Request failed: #{response.code}: #{response.body}" + end + end until done + end + def zookeeper_caching_management!(zookeeper_uri, opts) @zk = ZooHandler.new(zookeeper_uri, opts)