From fa0f376079ae199fe80f53419d5eccbe1c231e5d Mon Sep 17 00:00:00 2001 From: liubaohai Date: Sun, 20 May 2018 17:05:04 +0800 Subject: [PATCH 01/11] add redis proxy client --- lib/acid/redis_proxy_cli.lua | 251 ++++++++++++++++ t/redis_proxy_client_test.t | 540 +++++++++++++++++++++++++++++++++++ 2 files changed, 791 insertions(+) create mode 100644 lib/acid/redis_proxy_cli.lua create mode 100644 t/redis_proxy_client_test.t diff --git a/lib/acid/redis_proxy_cli.lua b/lib/acid/redis_proxy_cli.lua new file mode 100644 index 0000000..787c637 --- /dev/null +++ b/lib/acid/redis_proxy_cli.lua @@ -0,0 +1,251 @@ +--example, how to use +-- local opts = { +-- nwr = {3, 2, 2}, +-- ak_sk = {'accesskey', 'secret_key'}, +-- timeouts = {1, 1, 1}, +-- } +-- local cli = redis_proxy_cli:new(ip, port, opts) +-- +-- "retry(another N times) is optional" +-- cli:get(key, retry) +-- E.g. cli:get('key1') +-- E.g. cli:get('key1', 1) +-- +-- "expire(msec) and retry(another N times) are optional" +-- cli:set(key, val, expire, retry) +-- E.g. cli:set('key1', 'val1') +-- E.g. cli:set('key1', 'val1', 1000) +-- E.g. cli:set('key1', 'val1', nil, 2) +-- E.g. cli:set('key1', 'val1', 1000, 2) +-- +-- "retry(another N times) is optional" +-- cli:hget(hashname, hashkey, retry) +-- E.g. cli:hget('hashname1', 'hashkey1') +-- E.g. cli:hget('hashname1', 'hashkey1', 2) +-- +-- "expire(msec) and retry(another N times) are optional" +-- cli:hset(hashname, hashkey, val, expire, retry) +-- E.g. cli:hset('hashname1', 'hashkey1', 'val') +-- E.g. cli:hset('hashname1', 'hashkey1', 'val', 1000) +-- E.g. cli:hset('hashname1', 'hashkey1', 'val', nil, 2) +-- E.g. cli:hset('hashname1', 'hashkey1', 'val', 1000, 2) + +local http_cli = require("acid.httpclient") +local aws_signer = require('resty.awsauth.aws_signer') +local tableutil = require('acid.tableutil') +local strutil = require('acid.strutil') +local json = require('acid.json') + +local _M = { _VERSION = '1.0' } +local mt = { __index = _M } + +local to_str = strutil.to_str + +-- cmd: {"http method", "args count", "optional args names"} +local cmds = { + get = {"GET", 2, {}}, + set = {"PUT", 4, {"expire"}}, + hget = {"GET", 3, {}}, + hset = {"PUT", 5, {"expire"}}, +} + + +local function _sign_req(rp_cli, req) + if rp_cli.access_key == nil or rp_cli.secret_key == nil then + return nil, nil, nil + end + + local signer, err, errmsg = aws_signer.new(rp_cli.access_key, + rp_cli.secret_key, + {default_expires = 600}) + if err ~= nil then + return nil, err, errmsg + end + + local opts = { + query_auth = true, + sign_payload = (req['body'] ~= nil), + } + + return signer:add_auth_v4(req, opts) +end + + +local function _make_req_uri(rp_cli, params, opts, qs_values) + local path = tableutil.extends({rp_cli.ver}, params) + + local qs_list = { + string.format('n=%s', rp_cli.n), + string.format('w=%s', rp_cli.w), + string.format('r=%s', rp_cli.r), + } + + for i = 1, #opts do + if opts[i] ~= nil and qs_values[i] ~= nil then + table.insert(qs_list, string.format('%s=%s', opts[i], qs_values[i])) + end + end + + return string.format( + '%s?%s', + table.concat(path, '/'), + table.concat(qs_list, '&') + ) +end + + +local function _req(rp_cli, request) + local req = tableutil.dup(request, true) + req['headers'] = req['headers'] or {} + + if req['headers']['Host'] == nil then + req['headers']['Host'] = string.format('%s:%s', rp_cli.ip, rp_cli.port) + end + + if req['body'] ~= nil then + req['headers']['Content-Length'] = #req['body'] + end + + local _, err, errmsg = _sign_req(rp_cli, req) + if err ~= nil then + return nil, err, errmsg + end + + local cli = http_cli:new(rp_cli.ip, rp_cli.port, rp_cli.timeouts) + local req_opts = { + method = req['verb'], + headers = req['headers'], + body = req['body'], + } + + local _, err, errmsg = cli:request(req['uri'], req_opts) + if err ~= nil then + return nil, err, errmsg + end + + local res, err, errmsg = cli:read_body(100*1024*1024) + if err ~= nil then + return nil, err, errmsg + end + + if cli.status == 404 then + return nil, 'KeyNotFoundError', to_str('Uri:', req['uri']) + + elseif cli.status ~= 200 then + return nil, 'ServerResponseError', to_str('Res:', res, ' Uri:', req['uri']) + + end + + return res, nil, nil +end + + +local function _parse_args(cmd, args, args_cnt, http_mtd, opts) + + local path = {string.upper(cmd)} + + -- (args count) - (#opts) - "retry" + local path_args_cnt = args_cnt - #opts - 1 + if http_mtd == "PUT" then + -- remove body + path_args_cnt = path_args_cnt - 1 + end + + for _ = 1, path_args_cnt do + table.insert(path, args[1]) + table.remove(args, 1) + end + + local body + if http_mtd == "PUT" then + body = args[1] + table.remove(args, 1) + end + + local retry + if #args > #opts then + retry = args[#args] + table.remove(args, #args) + end + + return path, args, body, retry +end + + +local function _do_cmd(rp_cli, cmd, ...) + local cmd_info = cmds[cmd] + if cmd_info == nil then + local support_keys = tableutil.keys(cmds) + return nil, 'NotSupportCmd', to_str(cmd, ' not in ', support_keys) + end + + local args = {...} + local http_mtd, args_cnt, opts = cmd_info[1], cmd_info[2], cmd_info[3] + local path, qs_values, body, retry = _parse_args(cmd, args, args_cnt, http_mtd, opts) + + local req = { + verb = http_mtd, + uri = _make_req_uri(rp_cli, path, opts, qs_values), + } + + local res, err, errmsg + if body ~= nil then + req['body'], err = json.enc(body) + if err ~= nil then + return nil, err, to_str("json encode error:", body) + end + end + + retry = retry or 0 + for _ = 1, retry + 1 do + res, err, errmsg = _req(rp_cli, req) + if err == nil then + break + end + end + + if err ~= nil then + return nil, err, errmsg + end + + if http_mtd == 'GET' then + res, err = json.dec(res) + if err ~= nil then + return nil, err, to_str("json decode error:", res) + end + return res, nil, nil + end + + return nil, nil, nil +end + + +function _M.new(_, ip, port, opts) + opts = opts or {} + local nwr = opts.nwr or {3, 2, 2} + local ak_sk = opts.ak_sk or {} + local n, w, r = nwr[1], nwr[2], nwr[3] + local access_key, secret_key = ak_sk[1], ak_sk[2] + + return setmetatable({ + ver = '/redisproxy/v1', + ip = ip, + port = port, + n = n, + w = w, + r = r, + access_key = access_key, + secret_key = secret_key, + timeouts = opts.timeouts, + }, mt) +end + + +for cmd, _ in pairs(cmds) do + _M[cmd] = function (self, ...) + return _do_cmd(self, cmd, ...) + end +end + + +return _M diff --git a/t/redis_proxy_client_test.t b/t/redis_proxy_client_test.t new file mode 100644 index 0000000..ffafbcb --- /dev/null +++ b/t/redis_proxy_client_test.t @@ -0,0 +1,540 @@ +# vim:set ft=lua ts=4 sw=4 et: + +use Test::Nginx::Socket 'no_plan'; + +use Cwd qw(cwd); +my $pwd = cwd(); + +no_long_string(); + +our $HttpConfig = qq{ + + lua_package_path "$pwd/lib/?.lua;;"; + lua_package_cpath "$pwd/lib/?.so;;"; +}; + +run_tests(); + +__DATA__ + +=== TEST 1: test get +--- http_config eval: $::HttpConfig +--- config + location /t { + content_by_lua ' + local rp = require("acid.redis_proxy_cli") + local cli = rp:new("127.0.0.1", ngx.var.server_port, {nwr={3,2,2},ak_sk={"ak","sk"}}) + + if cli == nil then + ngx.log(ngx.ERR, " create client error") + return + end + + local res, err, errmsg = cli:get("test_key") + if err ~= nil then + ngx.log(ngx.ERR, " get error", " err:", err, " msg:", errmsg) + end + + ngx.say(res) + '; + } + + location /redisproxy/v1 { + content_by_lua ' + local json = require("acid.json") + local qs = ngx.req.get_uri_args() + + if qs.n ~= "3" or qs.w ~= "2" or qs.r ~= "2" then + ngx.log(ngx.ERR, "nwr error") + end + + if ngx.var.uri ~= "/redisproxy/v1/GET/test_key" then + ngx.log(ngx.ERR, "uri error") + end + + if ngx.var.request_method ~= "GET" then + ngx.log(ngx.ERR, "http method error") + end + + ngx.say(json.enc("ok")) + '; + } +--- request +GET /t +--- response_body +ok +--- no_error_log +[error] + +=== TEST 2: test get with retry +--- http_config eval: $::HttpConfig +--- config + location /t { + content_by_lua ' + local rp = require("acid.redis_proxy_cli") + local cli = rp:new("127.0.0.1", ngx.var.server_port, {nwr={3,2,2},ak_sk={"ak","sk"}}) + + if cli == nil then + ngx.log(ngx.ERR, " create client error") + return + end + + local res, err, errmsg = cli:get("test_key", 2) + if err ~= nil then + ngx.log(ngx.ERR, " get error", " err:", err, " msg:", errmsg) + end + + ngx.say(res) + '; + } + + location /redisproxy/v1 { + content_by_lua ' + local json = require("acid.json") + local qs = ngx.req.get_uri_args() + + if qs.n ~= "3" or qs.w ~= "2" or qs.r ~= "2" then + ngx.log(ngx.ERR, "nwr error") + end + + if ngx.var.uri ~= "/redisproxy/v1/GET/test_key" then + ngx.log(ngx.ERR, "uri error") + end + + if ngx.var.request_method ~= "GET" then + ngx.log(ngx.ERR, "http method error") + end + + ngx.say(json.enc("ok")) + '; + } +--- request +GET /t +--- response_body +ok +--- no_error_log +[error] + +=== TEST 3: test set +--- http_config eval: $::HttpConfig +--- config + location /t { + content_by_lua ' + local rp = require("acid.redis_proxy_cli") + local cli = rp:new("127.0.0.1", ngx.var.server_port, {nwr={3,2,2},ak_sk={"ak","sk"}}) + + if cli == nil then + ngx.log(ngx.ERR, " create client error") + return + end + + local res, err, errmsg = cli:set("test_key", 2) + if err ~= nil then + ngx.log(ngx.ERR, " set error", " err:", err, " msg:", errmsg) + end + '; + } + + location /redisproxy/v1 { + content_by_lua ' + local json = require("acid.json") + local qs = ngx.req.get_uri_args() + + if ngx.var.request_method ~= "PUT" then + ngx.log(ngx.ERR, "http method error") + end + + if qs.n ~= "3" or qs.w ~= "2" or qs.r ~= "2" then + ngx.log(ngx.ERR, "nwr error") + end + + if qs.expire ~= nil then + ngx.log(ngx.ERR, "expire error") + end + + if ngx.var.uri ~= "/redisproxy/v1/SET/test_key" then + ngx.log(ngx.ERR, "uri error") + end + + ngx.req.read_body() + local value = ngx.req.get_body_data() + + if value ~= json.enc(2) then + ngx.log(ngx.ERR, "set value error") + end + '; + } +--- request +GET /t +--- no_error_log +[error] + +=== TEST 4: test set expire +--- http_config eval: $::HttpConfig +--- config + location /t { + content_by_lua ' + local rp = require("acid.redis_proxy_cli") + local cli = rp:new("127.0.0.1", ngx.var.server_port, {nwr={3,2,2},ak_sk={"ak","sk"}}) + + if cli == nil then + ngx.log(ngx.ERR, " create client error") + return + end + + local val = {"1", "2"} + local res, err, errmsg = cli:set("test_key", val, 1000) + if err ~= nil then + ngx.log(ngx.ERR, " set error", " err:", err, " msg:", errmsg) + end + '; + } + + location /redisproxy/v1 { + content_by_lua ' + local json = require("acid.json") + local qs = ngx.req.get_uri_args() + + if ngx.var.request_method ~= "PUT" then + ngx.log(ngx.ERR, "http method error") + end + + if qs.n ~= "3" or qs.w ~= "2" or qs.r ~= "2" then + ngx.log(ngx.ERR, "nwr error") + end + + if qs.expire ~= "1000" then + ngx.log(ngx.ERR, "expire error") + end + + if ngx.var.uri ~= "/redisproxy/v1/SET/test_key" then + ngx.log(ngx.ERR, "uri error") + end + + ngx.req.read_body() + local value = ngx.req.get_body_data() + + if value ~= json.enc({"1", "2"}) then + ngx.log(ngx.ERR, "set value error") + end + '; + } +--- request +GET /t +--- no_error_log +[error] + +=== TEST 5: test set expire retry +--- http_config eval: $::HttpConfig +--- config + location /t { + content_by_lua ' + local rp = require("acid.redis_proxy_cli") + local cli = rp:new("127.0.0.1", ngx.var.server_port, {nwr={3,2,2},ak_sk={"ak","sk"}}) + + if cli == nil then + ngx.log(ngx.ERR, " create client error") + return + end + + local val = {"1", "2"} + local res, err, errmsg = cli:set("test_key", val, 1000, 20) + if err ~= nil then + ngx.log(ngx.ERR, " set error", " err:", err, " msg:", errmsg) + end + '; + } + + location /redisproxy/v1 { + content_by_lua ' + local json = require("acid.json") + local qs = ngx.req.get_uri_args() + + if ngx.var.request_method ~= "PUT" then + ngx.log(ngx.ERR, "http method error") + end + + if qs.n ~= "3" or qs.w ~= "2" or qs.r ~= "2" then + ngx.log(ngx.ERR, "nwr error") + end + + if qs.expire ~= "1000" then + ngx.log(ngx.ERR, "expire error") + end + + if ngx.var.uri ~= "/redisproxy/v1/SET/test_key" then + ngx.log(ngx.ERR, "uri error") + end + + ngx.req.read_body() + local value = ngx.req.get_body_data() + + if value ~= json.enc({"1", "2"}) then + ngx.log(ngx.ERR, "set value error") + end + '; + } +--- request +GET /t +--- no_error_log +[error] + +=== TEST 6: test hget +--- http_config eval: $::HttpConfig +--- config + location /t { + content_by_lua ' + local rp = require("acid.redis_proxy_cli") + local cli = rp:new("127.0.0.1", ngx.var.server_port, {nwr={3,2,2},ak_sk={"ak","sk"}}) + + if cli == nil then + ngx.log(ngx.ERR, " create client error") + return + end + + local res, err, errmsg = cli:hget("hashname1", "hashkey1") + if err ~= nil then + ngx.log(ngx.ERR, " hget error", " err:", err, " msg:", errmsg) + end + + ngx.say(res) + '; + } + + location /redisproxy/v1 { + content_by_lua ' + local json = require("acid.json") + local qs = ngx.req.get_uri_args() + + if ngx.var.request_method ~= "GET" then + ngx.log(ngx.ERR, "http method error") + end + + if qs.n ~= "3" or qs.w ~= "2" or qs.r ~= "2" then + ngx.log(ngx.ERR, "nwr error") + end + + if ngx.var.uri ~= "/redisproxy/v1/HGET/hashname1/hashkey1" then + ngx.log(ngx.ERR, "uri error") + end + + ngx.say(json.enc("ok")) + '; + } +--- request +GET /t +--- response_body +ok +--- no_error_log +[error] + +=== TEST 7: test hget with retry +--- http_config eval: $::HttpConfig +--- config + location /t { + content_by_lua ' + local rp = require("acid.redis_proxy_cli") + local cli = rp:new("127.0.0.1", ngx.var.server_port, {nwr={3,2,2},ak_sk={"ak","sk"}}) + + if cli == nil then + ngx.log(ngx.ERR, " create client error") + return + end + + local res, err, errmsg = cli:hget("hashname1", "hashkey1", 2) + if err ~= nil then + ngx.log(ngx.ERR, " hget error", " err:", err, " msg:", errmsg) + end + + ngx.say(res) + '; + } + + location /redisproxy/v1 { + content_by_lua ' + local json = require("acid.json") + local qs = ngx.req.get_uri_args() + + if ngx.var.request_method ~= "GET" then + ngx.log(ngx.ERR, "http method error") + end + + if qs.n ~= "3" or qs.w ~= "2" or qs.r ~= "2" then + ngx.log(ngx.ERR, "nwr error") + end + + if ngx.var.uri ~= "/redisproxy/v1/HGET/hashname1/hashkey1" then + ngx.log(ngx.ERR, "uri error") + end + + ngx.say(json.enc("ok")) + '; + } +--- request +GET /t +--- response_body +ok +--- no_error_log +[error] + +=== TEST 8: test hset +--- http_config eval: $::HttpConfig +--- config + location /t { + content_by_lua ' + local rp = require("acid.redis_proxy_cli") + local cli = rp:new("127.0.0.1", ngx.var.server_port, {nwr={3,2,2},ak_sk={"ak","sk"}}) + + if cli == nil then + ngx.log(ngx.ERR, " create client error") + return + end + + local res, err, errmsg = cli:hset("hashname1", "hashkey1", "val") + if err ~= nil then + ngx.log(ngx.ERR, " hset error", " err:", err, " msg:", errmsg) + end + '; + } + + location /redisproxy/v1 { + content_by_lua ' + local json = require("acid.json") + local qs = ngx.req.get_uri_args() + + if ngx.var.request_method ~= "PUT" then + ngx.log(ngx.ERR, "http method error") + end + + if qs.n ~= "3" or qs.w ~= "2" or qs.r ~= "2" then + ngx.log(ngx.ERR, "nwr error") + end + + if qs.expire ~= nil then + ngx.log(ngx.ERR, "expire error") + end + + if ngx.var.uri ~= "/redisproxy/v1/HSET/hashname1/hashkey1" then + ngx.log(ngx.ERR, "uri error") + end + + ngx.req.read_body() + local value = ngx.req.get_body_data() + + if value ~= json.enc("val") then + ngx.log(ngx.ERR, "hset value error") + end + '; + } +--- request +GET /t +--- no_error_log +[error] + +=== TEST 9: test hset expire +--- http_config eval: $::HttpConfig +--- config + location /t { + content_by_lua ' + local rp = require("acid.redis_proxy_cli") + local cli = rp:new("127.0.0.1", ngx.var.server_port, {nwr={3,2,2},ak_sk={"ak","sk"}}) + + if cli == nil then + ngx.log(ngx.ERR, " create client error") + return + end + + local res, err, errmsg = cli:hset("hashname1", "hashkey1", "val", 1000) + if err ~= nil then + ngx.log(ngx.ERR, " hset error", " err:", err, " msg:", errmsg) + end + '; + } + + location /redisproxy/v1 { + content_by_lua ' + local json = require("acid.json") + local qs = ngx.req.get_uri_args() + + if ngx.var.request_method ~= "PUT" then + ngx.log(ngx.ERR, "http method error") + end + + if qs.n ~= "3" or qs.w ~= "2" or qs.r ~= "2" then + ngx.log(ngx.ERR, "nwr error") + end + + if qs.expire ~= "1000" then + ngx.log(ngx.ERR, "expire error") + end + + if ngx.var.uri ~= "/redisproxy/v1/HSET/hashname1/hashkey1" then + ngx.log(ngx.ERR, "uri error") + end + + ngx.req.read_body() + local value = ngx.req.get_body_data() + + if value ~= json.enc("val") then + ngx.log(ngx.ERR, "hset value error") + end + '; + } +--- request +GET /t +--- no_error_log +[error] + +=== TEST 10: test hset expire retry +--- http_config eval: $::HttpConfig +--- config + location /t { + content_by_lua ' + local rp = require("acid.redis_proxy_cli") + local cli = rp:new("127.0.0.1", ngx.var.server_port, {nwr={3,2,2},ak_sk={"ak","sk"}}) + + if cli == nil then + ngx.log(ngx.ERR, " create client error") + return + end + + local res, err, errmsg = cli:hset("hashname1", "hashkey1", {"1"}, 1000, 2) + if err ~= nil then + ngx.log(ngx.ERR, " hset error", " err:", err, " msg:", errmsg) + end + '; + } + + location /redisproxy/v1 { + content_by_lua ' + local json = require("acid.json") + local qs = ngx.req.get_uri_args() + + if ngx.var.request_method ~= "PUT" then + ngx.log(ngx.ERR, "http method error") + end + + if qs.n ~= "3" or qs.w ~= "2" or qs.r ~= "2" then + ngx.log(ngx.ERR, "nwr error") + end + + if qs.expire ~= "1000" then + ngx.log(ngx.ERR, "expire error") + end + + if ngx.var.uri ~= "/redisproxy/v1/HSET/hashname1/hashkey1" then + ngx.log(ngx.ERR, "uri error") + end + + ngx.req.read_body() + local value = ngx.req.get_body_data() + + if value ~= json.enc({"1"}) then + ngx.log(ngx.ERR, "hset value error") + end + '; + } +--- request +GET /t +--- no_error_log +[error] From 191ae9a056dd7d5a242dbdfa40e56c8d5d3e88c1 Mon Sep 17 00:00:00 2001 From: renzhi Date: Wed, 14 Mar 2018 10:44:09 +0800 Subject: [PATCH 02/11] add chash --- README.md | 1 + doc/acid/chash.md | 156 ++++++++++++++++ lib/acid/chash.lua | 448 +++++++++++++++++++++++++++++++++++++++++++++ lib/test_chash.lua | 317 ++++++++++++++++++++++++++++++++ t/chash.t | 43 +++++ 5 files changed, 965 insertions(+) create mode 100644 doc/acid/chash.md create mode 100644 lib/acid/chash.lua create mode 100644 lib/test_chash.lua create mode 100644 t/chash.t diff --git a/README.md b/README.md index 3542d45..60bbd37 100644 --- a/README.md +++ b/README.md @@ -21,6 +21,7 @@ It is meant to be a underlaying code base for building a distributed system. | name | description | status | | :-- | :-- | :-- | | [acid.async_cache](doc/acid/async_cache.md) | shared-dict based cache, update asynchronously. | well tested | +| [acid.chash](doc/acid/chash.md) | consistent hash module. | well tested | | [acid.strutil](doc/acid/strutil.md) | string operation functions. | well tested | | [acid.tableutil](doc/acid/tableutil.md) | table operation functions. | well tested | | [acid.unittest](doc/acid/unittest.md) | unittest engine that looks for test functions in a dir. | well tested | diff --git a/doc/acid/chash.md b/doc/acid/chash.md new file mode 100644 index 0000000..743f267 --- /dev/null +++ b/doc/acid/chash.md @@ -0,0 +1,156 @@ + + +# Table of Content + +- [Name](#name) +- [Status](#status) +- [Description](#description) +- [Synopsis](#synopsis) +- [Methods](#methods) + - [chash.new](#chashnew) + - [chash.choose_server](#chashchoose_server) + - [chash.update_server](#chashupdate_server) + - [chash.delete_server](#chashdelete_server) +- [Author](#author) +- [Copyright and License](#copyright-and-license) + + + +# Name + +acid.chash + +# Status + +This library is considered production ready. + +# Description + +Consistent hash module for ngx_lua. + +# Synopsis + +```lua +local chash = require('acid.chash') + +local servers = { + server_1 = 128, + server_2 = 128, + server_3 = 256, +} + +local c_hash, err, errmsg = chash.new(servers) +if err ~= nil then + ngx.say('error') +end + +local server_names, err, errmsg = c_hash:choose_server( + 'key_foo', {nr_choose=2}) +if err ~= nil then + ngx.say('error') +end + +-- server_names +{server_x1, servers_x2} + +local _, err, errmsg = c_hash:update_server({server_4 = 256}) +if err ~= nil then + ngx.say('error') +end + +local _, err, errmsg = c_hash:delete_server({'server_4'}) +if err ~= nil then + ngx.say('error') +end +``` + +# Methods + +## chash.new + +**syntax**: +`c_hash, err, errmsg = chash.new(servers, opts)` + +Create a new consistent hash object. + +**arguments**: + +- `servers`: + A lua table. The key is server name, value is the number of virtual + node for that server. + +- `opts`: + is a lua table holding the following keys: + + - `debug`: + set to `true` if need to calculate consistent rate and load + distribution. Default to `False`. + +**return**: +A consistent hash object. In case of failures, return `nil` and error code +and error message. + +## chash.choose_server + +**syntax**: +`server_names, err, errmsg = c_hash:choose_server(key, opts)` + +Choose server. + +**arguments**: + +- `key`: + is a string. + +- `opts`: + is a lua table holding the following keys: + + - `nr_choose`: + Specify how many servers to choose. Default to 1. + +**return**: +A table contains server names. + +## chash.update_server + +**syntax**: +`info, err, errmsg = c_hash:update_server(servers)` + +Add new servers or update virtual node number for servers. + +**arguments**: + +- `servers`: + A lua table. The key is server name, value is the number of virtual + node for that server. + +**return**: +If set `debug` to `true` when create consistent hash object, return a +lua table contains two fields 'consistent_rate' and 'load_distribution', +or return a empty table. In case of failures, return `nil` and error code +and error message. + +## chash.delete_server + +**syntax**: +`info, err, errmsg = c_hash:delete_server(server_names)` + +Delete servers. + +**arguments**: + +- `server_names`: + A lua table, contains name of servers to delete. + +**return**: +Same as `chash.update_server`. + +# Author + +Renzhi (任稚) + +# Copyright and License + +The MIT License (MIT) + +Copyright (c) 2015 Renzhi (任稚) diff --git a/lib/acid/chash.lua b/lib/acid/chash.lua new file mode 100644 index 0000000..8d29b0c --- /dev/null +++ b/lib/acid/chash.lua @@ -0,0 +1,448 @@ +local strutil = require('acid.strutil') +local tableutil = require('acid.tableutil') +local time = require('acid.time') + +local to_str = strutil.to_str +local NR_BUCKET = math.pow(2, 32) + +local _M = {} + +local _mt = {__index = _M} + + +local function compare_position(point1, point2) + if point1.position < point2.position then + return true + end + + return false +end + + +local function sort_and_spread(points) + local none_empty_index + + local max_collision = 0 + + for index, points_in_one_index in ipairs(points) do + if index % 1000 == 0 then + ngx.sleep(0.001) + end + + local collision = #points_in_one_index + + if collision > 0 then + none_empty_index = index + end + + if collision > 1 then + table.sort(points_in_one_index, compare_position) + end + + if collision > max_collision then + max_collision = collision + end + end + + ngx.log(ngx.INFO, string.format( + 'hash of %d points, max collision is: %d', + #points, max_collision)) + + local curr_index = none_empty_index + + for _ = 1, #points - 1 do + local pre_index = curr_index - 1 + if pre_index == 0 then + pre_index = #points + end + + if #points[pre_index] == 0 then + table.insert(points[pre_index], points[curr_index][1]) + end + + curr_index = pre_index + end +end + + +local function get_position_list(points) + local positions = {} + + for _, points_in_one_index in ipairs(points) do + for _, point in ipairs(points_in_one_index) do + if #positions == 0 or + point.position > positions[#positions].position then + table.insert(positions, + {position = point.position, + server_name = point.server_name}) + end + end + end + + return positions +end + + +local function merge_sort_postions(old_positions, new_positions) + local merged_positions = {} + + local index_old = 1 + local index_new = 1 + + while true do + if index_old > #old_positions then + break + end + + if index_new > #new_positions then + break + end + + local old_position = old_positions[index_old] + local new_position = new_positions[index_new] + + if old_position.position <= new_position.position then + table.insert(merged_positions, + {old_position.position, + old_position.server_name, + new_position.server_name}) + index_old = index_old + 1 + else + table.insert(merged_positions, + {new_position.position, + old_position.server_name, + new_position.server_name}) + index_new = index_new + 1 + end + end + + if index_old <= #old_positions then + for index = index_old, #old_positions do + table.insert(merged_positions, + {old_positions[index].position, + old_positions[index].server_name, + new_positions[1].server_name}) + end + end + + if index_new <= #new_positions then + for index = index_new, #new_positions do + table.insert(merged_positions, + {new_positions[index].position, + old_positions[1].server_name, + new_positions[index].server_name}) + end + end + + return merged_positions +end + + +local function calc_consistent_rate(old_points, new_points) + local old_positions = get_position_list(old_points) + if #old_positions == 0 then + return 0 + end + + local new_positions = get_position_list(new_points) + if #new_positions == 0 then + return 0 + end + + local merged_positions = merge_sort_postions(old_positions, new_positions) + + local consistent_n = 0 + + for index = 1, #merged_positions do + local bucket_n + if index == 1 then + bucket_n = NR_BUCKET - merged_positions[#merged_positions][1] + + merged_positions[index][1] + else + bucket_n = merged_positions[index][1] - + merged_positions[index - 1][1] + end + + if merged_positions[index][2] == merged_positions[index][3] then + consistent_n = consistent_n + bucket_n + end + end + + return consistent_n / NR_BUCKET +end + + +local function calc_load_distribution(points) + local load_distribution = {} + + local positions = get_position_list(points) + + for index = 1, #positions do + local bucket_n + if index == 1 then + bucket_n = NR_BUCKET - positions[#positions].position + + positions[index].position + else + bucket_n = positions[index].position - + positions[index - 1].position + end + + local server_name = positions[index].server_name + if load_distribution[server_name] == nil then + load_distribution[server_name] = 0 + end + + load_distribution[server_name] = load_distribution[server_name] + bucket_n + end + + for server_name, bucket_n in pairs(load_distribution) do + load_distribution[server_name] = bucket_n / NR_BUCKET + end + + return load_distribution +end + + +function _M.reinit(c_hash) + local points = {} + local nr_point = 0 + + local init_ms_used = {} + local start_ts = time.get_ms() + + for _, nr_virtual_node in pairs(c_hash.servers) do + nr_point = nr_point + nr_virtual_node + end + + for point_index = 1, nr_point do + points[point_index] = {} + end + + init_ms_used.init_array = time.get_ms() - start_ts + + local count = 0 + for server_name, nr_virtual_node in pairs(c_hash.servers) do + count = count + 1 + if count % 100 == 0 then + ngx.sleep(0.001) + end + for i = 1, nr_virtual_node do + local hash_key = server_name .. tostring(i) + local hash_code = c_hash.hash_func(hash_key) + hash_code = (hash_code % NR_BUCKET) + 1 + + local point = { + server_name = server_name, + virtual_node_seq = i, + position = hash_code, + } + + local point_index = math.ceil( + nr_point * hash_code / NR_BUCKET) + + table.insert(points[point_index], point) + end + end + + init_ms_used.hash = time.get_ms() - start_ts + + sort_and_spread(points) + + init_ms_used.sort = time.get_ms() - start_ts + + if c_hash.debug then + local consistent_rate = calc_consistent_rate(c_hash.points, points) + ngx.log(ngx.INFO, string.format('after reinit, consistent rate is: %f', + consistent_rate)) + c_hash.consistent_rate = consistent_rate + + init_ms_used.consistent_rate = time.get_ms() - start_ts + + local load_distribution = calc_load_distribution(points) + ngx.log(ngx.INFO, string.format('after reinit, load distribution is: %s', + to_str(load_distribution))) + c_hash.load_distribution = load_distribution + + init_ms_used.load_distribution = time.get_ms() - start_ts + end + + c_hash.init_ms_used = init_ms_used + ngx.log(ngx.INFO, 'init ms used: ' .. to_str(init_ms_used)) + + c_hash.points = points + c_hash.nr_point = nr_point + + return c_hash, nil, nil +end + + +function _M.new(servers, opts) + if opts == nil then + opts = {} + end + + local c_hash = { + servers = servers, + server_names = tableutil.keys(servers), + points = {}, + nr_point = 0, + hash_func = opts.hash_func or ngx.crc32_long, + debug = opts.debug == true, + } + + local _, err, errmsg = _M.reinit(c_hash) + if err ~= nil then + return nil, err, errmsg + end + + return setmetatable(c_hash, _mt), nil, nil +end + + +function _M.scan_for_extra_server(self, scan_start_index, scan_sub_index, + nr_choose, chose_servers) + local curr_index = scan_start_index + local nr_scaned = 0 + local enough = false + + for n = 1, self.nr_point + 1 do + if n ~= 1 then + scan_sub_index = 1 + end + local points_in_one_index = self.points[curr_index] + + for i = scan_sub_index, #points_in_one_index do + local p = points_in_one_index[i] + + nr_scaned = nr_scaned + 1 + + if not tableutil.has(chose_servers, p.server_name) then + table.insert(chose_servers, p.server_name) + if #chose_servers >= nr_choose then + enough = true + break + end + end + end + + if enough then + break + end + + curr_index = curr_index + 1 + if curr_index >= self.nr_point then + curr_index = 1 + end + end + + if nr_scaned > 100 then + ngx.log(ngx.WARN, string.format('scaned %d points to find %d servers', + nr_scaned, nr_choose)) + end + + if not enough then + return nil, 'FindServerError', string.format( + 'failed to find %d servers, scaned %d points', + nr_choose, nr_scaned) + end +end + + +function _M.choose_server(self, key, opts) + if opts == nil then + opts = {} + end + + local nr_choose = opts.nr_choose or 1 + + if nr_choose > #self.server_names then + return nil, 'ServerNotEnough', string.format( + 'server number: %d is less than %d', + #self.server_names, nr_choose) + end + + if nr_choose == #self.server_names then + return tableutil.dup(self.server_names, true), nil, nil + end + + local hash_code = self.hash_func(key) + hash_code = (hash_code % NR_BUCKET) + 1 + + local start_index = math.ceil( + self.nr_point * hash_code / NR_BUCKET) + + local next_index = start_index + 1 + if next_index > self.nr_point then + next_index = 1 + end + + local points = self.points[start_index] + local point = points[#points] + + local scan_start_index = start_index + local scan_sub_index = 1 + + if hash_code > point.position then + point = self.points[next_index][1] + scan_start_index = next_index + scan_sub_index = 2 + else + for i = #points - 1, 1, -1 do + if points[i].position >= hash_code then + point = points[i] + scan_sub_index = i + 1 + end + end + end + + local chose_servers = {point.server_name} + + if nr_choose == 1 then + return chose_servers + end + + local _, err, errmsg = self:scan_for_extra_server( + scan_start_index, scan_sub_index, nr_choose, chose_servers) + if err ~= nil then + return nil, err, errmsg + end + + return chose_servers +end + + +function _M.update_server(self, servers) + tableutil.update(self.servers, servers) + + self.server_names = tableutil.keys(self.servers) + + _M.reinit(self) + + local info = { + load_distribution = self.load_distribution, + consistent_rate = self.consistent_rate, + } + + return info, nil, nil +end + + +function _M.delete_server(self, server_names) + for _, server_name in ipairs(server_names) do + self.servers[server_name] = nil + end + + self.server_names = tableutil.keys(self.servers) + + _M.reinit(self) + + local info = { + load_distribution = self.load_distribution, + consistent_rate = self.consistent_rate, + } + + return info, nil, nil +end + + +return _M diff --git a/lib/test_chash.lua b/lib/test_chash.lua new file mode 100644 index 0000000..1d59e5b --- /dev/null +++ b/lib/test_chash.lua @@ -0,0 +1,317 @@ +local chash = require('acid.chash') +local time = require('acid.time') + + +math.randomseed(ngx.now() * 1000) + + +function test.basic(t) + for _, nr_server, nr_vn, nr_choose, exp_err, desc in t:case_iter(4, { + {0, 0, 0, nil}, + {0, 0, 1, 'ServerNotEnough'}, + {1, 1, 1, nil }, + {1, 1, 2, 'ServerNotEnough'}, + {2, 1, 2, nil }, + {2, 1000, 2, nil }, + {2, 1000, 3, 'ServerNotEnough'}, + {2, 100, 2, nil }, + {100, 100, 100, nil }, + }) do + local servers = {} + for _ = 1, nr_server do + local server_name = 'test_server_' .. tostring(math.random(99999)) + servers[server_name] = nr_vn + end + + local c_hash, err, errmsg = chash.new(servers) + t:eq(nil, err, errmsg) + + local server_names, err, errmsg = c_hash:choose_server( + 'key_foo', {nr_choose=nr_choose}) + t:eq(exp_err, err, desc) + if err == nil then + t:eq(nr_choose, #server_names, desc) + end + end +end + + +function test.consistent_rate(t) + local server_names = { + 'server_' .. tostring(math.random(1, 99999)), + 'server_' .. tostring(math.random(1, 99999)), + 'server_' .. tostring(math.random(1, 99999)), + } + + local c_hash, err, errmsg = chash.new({[server_names[1]] = 1024}, + {debug = true}) + t:eq(nil, err, errmsg) + + t:eq(0, c_hash.consistent_rate) + t:eq(1, c_hash.load_distribution[server_names[1]]) + + local info, err, errmsg = c_hash:update_server({[server_names[2]] = 1024}) + t:eq(nil, err, errmsg) + test.dd(info) + + t:eq(true, 0.1 > math.abs(info.consistent_rate - 0.5)) + t:eq(true, 0.1 > math.abs(info.load_distribution[server_names[1]] - 0.5)) + t:eq(true, 0.1 > math.abs(info.load_distribution[server_names[2]] - 0.5)) + + local info, err, errmsg = c_hash:update_server({[server_names[3]] = 1024}) + t:eq(nil, err, errmsg) + test.dd(info) + + t:eq(true, 0.1 > math.abs(info.consistent_rate - 2.0/3.0)) + t:eq(true, 0.1 > math.abs(info.load_distribution[server_names[1]] - 1.0/3.0)) + t:eq(true, 0.1 > math.abs(info.load_distribution[server_names[2]] - 1.0/3.0)) + t:eq(true, 0.1 > math.abs(info.load_distribution[server_names[3]] - 1.0/3.0)) +end + + +function test.consistent_rate_virtual_node_number_change(t) + for _, nr_server, nr_vn_old, nr_vn_new, rate, _ in t:case_iter(4, { + {200, 100, 50, 0.5}, + {200, 100, 80, 0.8}, + {100, 1000, 600, 0.6}, + {1000, 100, 40, 0.4}, + }) do + local servers = {} + for _ = 1, nr_server do + local server_name = 'test_server_' .. tostring(math.random(99999)) + servers[server_name] = nr_vn_old + end + + local c_hash, err, errmsg = chash.new(servers, {debug = true}) + t:eq(nil, err, errmsg) + + for server_name, _ in pairs(servers) do + servers[server_name] = nr_vn_new + end + local info, err, errmsg = c_hash:update_server(servers) + t:eq(nil, err, errmsg) + + test.dd(info.consistent_rate) + t:eq(true, 0.05 > math.abs(info.consistent_rate - rate)) + end +end + + +function test.consistent_rate_add_server(t) + for _, nr_server, nr_server_add, nr_vn, rate, _ in t:case_iter(4, { + {200, 200, 128, 0.5}, + {270, 30, 128, 0.9}, + }) do + local servers = {} + for _ = 1, nr_server do + local server_name = 'test_server_' .. tostring(math.random(99999)) + servers[server_name] = nr_vn + end + + local c_hash, err, errmsg = chash.new(servers, {debug = true}) + t:eq(nil, err, errmsg) + + local add_servers = {} + for _ = 1, nr_server_add do + local server_name = 'test_server_' .. tostring(math.random(99999)) + add_servers[server_name] = nr_vn + end + + local info, err, errmsg = c_hash:update_server(add_servers) + t:eq(nil, err, errmsg) + + test.dd(info.consistent_rate) + t:eq(true, 0.05 > math.abs(info.consistent_rate - rate)) + end +end + + +function test.consistent_rate_real(t) + for _, nr_server, nr_server_add, nr_vn, rate, _ in t:case_iter(4, { + {200, 200, 128, 0.5}, + {90, 10, 128, 0.9}, + }) do + local keys = {} + for _ = 1, 1000 * 10 do + table.insert(keys, 'test-key' .. tostring(math.random(9999999))) + end + + local servers = {} + for _ = 1, nr_server do + local server_name = 'test_server_' .. tostring(math.random(99999)) + servers[server_name] = nr_vn + end + + local c_hash, err, errmsg = chash.new(servers, {debug = true}) + t:eq(nil, err, errmsg) + + local old_server = {} + for _, key in ipairs(keys) do + local server_names, err, errmsg = c_hash:choose_server(key) + t:eq(nil, err, errmsg) + old_server[key] = server_names[1] + end + + local add_servers = {} + for _ = 1, nr_server_add do + local server_name = 'test_server_' .. tostring(math.random(99999)) + add_servers[server_name] = nr_vn + end + + local _, err, errmsg = c_hash:update_server(add_servers) + t:eq(nil, err, errmsg) + + local new_server = {} + for _, key in ipairs(keys) do + local server_names, err, errmsg = c_hash:choose_server(key) + t:eq(nil, err, errmsg) + new_server[key] = server_names[1] + end + + local consistent_n = 0 + for _, key in ipairs(keys) do + if old_server[key] == new_server[key] then + consistent_n = consistent_n + 1 + end + end + + local real_rate = consistent_n / #keys + t:eq(true, 0.05 > math.abs(real_rate - rate)) + end +end + + +function test.consistent_rate_real_pick_3(t) + for _, nr_server, nr_server_add, nr_vn, rate, _ in t:case_iter(4, { + {200, 200, 100, 0.5}, + {180, 20, 100, 0.9}, + }) do + local keys = {} + for _ = 1, 1000 * 10 do + table.insert(keys, 'test-key' .. tostring(math.random(9999999))) + end + + local servers = {} + for _ = 1, nr_server do + local server_name = 'test_server_' .. tostring(math.random(99999)) + servers[server_name] = nr_vn + end + + local c_hash, err, errmsg = chash.new(servers) + t:eq(nil, err, errmsg) + + local old_server = {{}, {}, {}} + for _, key in ipairs(keys) do + local server_names, err, errmsg = c_hash:choose_server( + key, {nr_choose = 3}) + t:eq(nil, err, errmsg) + old_server[1][key] = server_names[1] + old_server[2][key] = server_names[2] + old_server[3][key] = server_names[3] + end + + for _ = 1, nr_server_add do + local server_name = 'test_server_' .. tostring(math.random(99999)) + servers[server_name] = nr_vn + end + + local _, err, errmsg = c_hash:update_server(servers) + t:eq(nil, err, errmsg) + + local new_server = {{}, {}, {}} + for _, key in ipairs(keys) do + local server_names, err, errmsg = c_hash:choose_server( + key, {nr_choose = 3}) + t:eq(nil, err, errmsg) + new_server[1][key] = server_names[1] + new_server[2][key] = server_names[2] + new_server[3][key] = server_names[3] + end + + local real_rate = {} + for i = 1, 3 do + local consistent_n = 0 + for _, key in ipairs(keys) do + if old_server[i][key] == new_server[i][key] then + consistent_n = consistent_n + 1 + end + end + + table.insert(real_rate, consistent_n / #keys) + end + + t:eq(true, 0.05 > math.abs(real_rate[1] - rate)) + end +end + + +function test.performance_choose_server(t) + for _, nr_server, nr_vn, nr_choose, nr_rep, ms_use, desc in t:case_iter(5, { + { 200, 1024, 1, 1024 * 10, 1000}, + { 200, 1024, 10, 1024 * 10, 1000}, + { 400, 512, 1, 1024 * 10, 1000}, + { 400, 512, 10, 1024 * 10, 1000}, + { 4000, 128, 1, 1024 * 10, 1000}, + + }) do + + local servers = {} + for _ = 1, nr_server do + local server_name = 'test_server_' .. tostring(math.random(99999)) + servers[server_name] = nr_vn + end + + local c_hash, err, errmsg = chash.new(servers) + t:eq(nil, err, errmsg) + + local keys = {} + for _ = 1, nr_rep do + table.insert(keys, 'test-key' .. tostring(math.random(9999999))) + end + + local start_ms = time.get_ms() + + for key in ipairs(keys) do + c_hash:choose_server(key, {nr_choose = nr_choose}) + end + + local end_ms = time.get_ms() + + local ms_used = end_ms - start_ms + + test.dd(string.format('ms used for: %s is: %d', desc, ms_used)) + + t:eq(true, ms_used < ms_use) + end +end + + +function test.performance_reinit(t) + for _, nr_server, nr_vn, nr_rep, ms_use, desc in t:case_iter(4, { + { 200, 100, 1, 2000}, + { 2000, 10, 1, 2000}, + }) do + + local servers = {} + for _ = 1, nr_server do + local server_name = 'test_server_' .. tostring(math.random(99999)) + servers[server_name] = nr_vn + end + + local start_ms = time.get_ms() + + for _ = 1, nr_rep do + local c_hash, err, errmsg = chash.new(servers) + t:eq(nil, err, errmsg) + t:neq(nil, c_hash, desc) + end + + local end_ms = time.get_ms() + + local ms_used = end_ms - start_ms + + test.dd(string.format('ms used for: %s is: %d', desc, ms_used)) + + t:eq(true, ms_use > ms_used, desc) + end +end diff --git a/t/chash.t b/t/chash.t new file mode 100644 index 0000000..ccbe221 --- /dev/null +++ b/t/chash.t @@ -0,0 +1,43 @@ +# vim:set ft=lua ts=4 sw=4 et ft=perl: + +################################################################################ +# DO NOT EDIT THIS FILE. # +# Use ./t/build_ngx_ut.sh to regenerate this wrapper file. # +################################################################################ + +use Test::Nginx::Socket "no_plan"; + +no_long_string(); + +# Env TEST_VERBOSE is set by command "prove" +# +# Only env var starting with TEST_NGINX_ will be evaluated in the "--- config" +# block. +$ENV{TEST_NGINX_ACID_UT_VERBOSE} = $ENV{TEST_VERBOSE} || 0; + +run_tests(); + +__DATA__ + +=== TEST 1: chash +--- http_config + lua_shared_dict shared_dict_lock 1m; + lua_shared_dict test_shared 10m; + lua_check_client_abort on; + + lua_package_path "./lib/?.lua;;"; + lua_package_cpath "./lib/?.so;;"; +--- config + location /t { + content_by_lua_block { + require("acid.unittest").ngx_test_modules( + { "test_chash", }, + { debug = ($TEST_NGINX_ACID_UT_VERBOSE == 1), } + ) + } + } +--- request +GET /t +--- response_body_like +.*tests all passed.* +--- timeout: 300 From 6f95904aebf0b0be68ef0b4326aa4ae9181c05e1 Mon Sep 17 00:00:00 2001 From: renzhi Date: Mon, 19 Mar 2018 20:33:10 +0800 Subject: [PATCH 03/11] add consistenthash test --- lib/test_consistenthash.lua | 137 ++++++++++++++++++++++++++++++++++++ t/consistenthash.t | 43 +++++++++++ 2 files changed, 180 insertions(+) create mode 100644 lib/test_consistenthash.lua create mode 100644 t/consistenthash.t diff --git a/lib/test_consistenthash.lua b/lib/test_consistenthash.lua new file mode 100644 index 0000000..d0a567f --- /dev/null +++ b/lib/test_consistenthash.lua @@ -0,0 +1,137 @@ +local chash = require('acid.chash') +local consistenthash = require('consistenthash') +local time = require('acid.time') + + +math.randomseed(ngx.now() * 1000) + + +function test.consistenthash_get(t) + for _, nr_server, nr_choose, nr_rep, desc in t:case_iter(3, { + {500, 1, 1000 * 10}, + + }) do + + local server_names = {} + for _ = 1, nr_server do + local server_name = 'test_server_' .. tostring(math.random(99999)) + table.insert(server_names, server_name) + end + + local c_hash = consistenthash:new(server_names) + + local keys = {} + for _ = 1, nr_rep do + table.insert(keys, 'test-key' .. tostring(math.random(9999999))) + end + + local start_ms = time.get_ms() + + for key in ipairs(keys) do + c_hash:get(key, nr_choose) + end + + local end_ms = time.get_ms() + + local ms_used = end_ms - start_ms + + test.dd(string.format('consistenthash get ms used for: %s is: %d', + desc, ms_used)) + end +end + + +function test.consistenthash_new(t) + for _, nr_server, nr_rep, desc in t:case_iter(2, { + {500, 1}, + + }) do + + local server_names = {} + for _ = 1, nr_server do + local server_name = 'test_server_' .. tostring(math.random(99999)) + table.insert(server_names, server_name) + end + + local start_ms = time.get_ms() + + for _ = 1, nr_rep do + local c_hash = consistenthash:new(server_names) + end + + local end_ms = time.get_ms() + + local ms_used = end_ms - start_ms + + test.dd(string.format('consistenthash new ms used for: %s is: %d', + desc, ms_used)) + + --ngx.log(ngx.ERR, 'worker pid: ' .. tostring(ngx.worker.pid())) + --ngx.sleep(100) + end +end + + +function test.chash_choose_server(t) + for _, nr_server, nr_vn, nr_choose, nr_rep, desc in t:case_iter(4, { + {500, 160, 1, 1000 * 10}, + + }) do + + local servers = {} + for _ = 1, nr_server do + local server_name = 'test_server_' .. tostring(math.random(99999)) + servers[server_name] = nr_vn + end + + local c_hash, err, errmsg = chash.new(servers) + t:eq(nil, err, errmsg) + + local keys = {} + for _ = 1, nr_rep do + table.insert(keys, 'test-key' .. tostring(math.random(9999999))) + end + + local start_ms = time.get_ms() + + for key in ipairs(keys) do + c_hash:choose_server(key, {nr_choose = nr_choose}) + end + + local end_ms = time.get_ms() + + local ms_used = end_ms - start_ms + + test.dd(string.format('chash choose server ms used for: %s is: %d', + desc, ms_used)) + end +end + + +function test.chash_new(t) + for _, nr_server, nr_vn, nr_rep, desc in t:case_iter(3, { + {500, 160, 1}, + }) do + + local servers = {} + for _ = 1, nr_server do + local server_name = 'test_server_' .. tostring(math.random(99999)) + servers[server_name] = nr_vn + end + + local start_ms = time.get_ms() + + for _ = 1, nr_rep do + local c_hash, err, errmsg = chash.new(servers) + end + + local end_ms = time.get_ms() + + local ms_used = end_ms - start_ms + + test.dd(string.format('chash new ms used for: %s is: %d', + desc, ms_used)) + --ngx.log(ngx.ERR, 'worker pid: ' .. tostring(ngx.worker.pid())) + --ngx.sleep(100) + end +end diff --git a/t/consistenthash.t b/t/consistenthash.t new file mode 100644 index 0000000..813e232 --- /dev/null +++ b/t/consistenthash.t @@ -0,0 +1,43 @@ +# vim:set ft=lua ts=4 sw=4 et ft=perl: + +################################################################################ +# DO NOT EDIT THIS FILE. # +# Use ./t/build_ngx_ut.sh to regenerate this wrapper file. # +################################################################################ + +use Test::Nginx::Socket "no_plan"; + +no_long_string(); + +# Env TEST_VERBOSE is set by command "prove" +# +# Only env var starting with TEST_NGINX_ will be evaluated in the "--- config" +# block. +$ENV{TEST_NGINX_ACID_UT_VERBOSE} = $ENV{TEST_VERBOSE} || 0; + +run_tests(); + +__DATA__ + +=== TEST 1: consistenthash +--- http_config + lua_shared_dict shared_dict_lock 1m; + lua_shared_dict test_shared 10m; + lua_check_client_abort on; + + lua_package_path "./lib/?.lua;;"; + lua_package_cpath "./lib/?.so;;"; +--- config + location /t { + content_by_lua_block { + require("acid.unittest").ngx_test_modules( + { "test_consistenthash", }, + { debug = ($TEST_NGINX_ACID_UT_VERBOSE == 1), } + ) + } + } +--- request +GET /t +--- response_body_like +.*tests all passed.* +--- timeout: 300 From 7261426a1338480d5ac7d71da32690951a813e66 Mon Sep 17 00:00:00 2001 From: shuwen5 Date: Wed, 16 May 2018 14:40:48 +0800 Subject: [PATCH 04/11] add redis.lua --- lib/acid/redis.lua | 141 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 141 insertions(+) create mode 100644 lib/acid/redis.lua diff --git a/lib/acid/redis.lua b/lib/acid/redis.lua new file mode 100644 index 0000000..e3900e1 --- /dev/null +++ b/lib/acid/redis.lua @@ -0,0 +1,141 @@ +local resty_redis = require("resty.redis") +local strutil = require("acid.strutil") +local rpc_logging = require("acid.rpc_logging") + +local to_str = strutil.to_str + +local _M = {} +local mt = { __index = _M } + +local function get_redis_cli(self) + local redis_cli, err_msg = resty_redis:new() + if redis_cli == nil then + return nil, nil, 'NewRedisError', err_msg + end + + redis_cli:set_timeout(self.timeout) + + local ok, err_msg = redis_cli:connect( self.ip, self.port ) + if ok == nil then + return nil, 'ConnectRedisError', err_msg + end + + return redis_cli +end + +local function run_redis_cmd(self, cmd, ...) + local args = {...} + + local log_entry = rpc_logging.new_entry('redis', { + ip = self.ip, + port = self.port, + uri = to_str(cmd, ':', args[1])}) + + local redis_cli, err_code, err_msg = get_redis_cli(self) + rpc_logging.set_time(log_entry, 'upstream', 'conn') + if err_code ~= nil then + rpc_logging.set_err(log_entry, err_code) + rpc_logging.add_log(log_entry) + return nil, err_code, err_msg + end + + local val, err_msg = redis_cli[cmd](redis_cli, ... ) + rpc_logging.set_time(log_entry, 'upstream', 'recv') + + if val == nil or err_msg ~= nil then + rpc_logging.set_err(log_entry, err_msg) + rpc_logging.add_log(log_entry) + return nil, 'RunRedisCMDError', to_str('cmd: ', cmd, ', err: ', err_msg) + end + + local itv = log_entry.upstream.time.conn + log_entry.upstream.time.recv + if itv >= self.min_log_time then + rpc_logging.add_log(log_entry) + end + + if self.keepalive_timeout ~= nil then + redis_cli:set_keepalive(self.keepalive_timeout, self.keepalive_size) + end + + return val +end + +function _M.new(_, ip, port, opts) + local opts = opts or {} + + local obj = { + ip = ip, + port = port, + timeout = opts.timeout or 1000, + + retry_count = opts.retry_count or 1, + + keepalive_size = opts.keepalive_size, + keepalive_timeout = opts.keepalive_timeout, + + min_log_time = opts.min_log_time or 0.005, + } + + return setmetatable( obj, mt ) +end + +function _M.retry(self, n) + self.retry_count = n + + return self +end + +function _M.transaction(self, cmds) + local ok, err_code, err_msg = self:multi() + if err_code ~= nil then + return nil, err_code, err_msg + end + + if ok ~= 'OK' then + return nil, 'RunRedisCMDError', 'multi no reply with the string OK' + end + + for _, cmd_and_args in ipairs(cmds) do + local cmd, cmd_args = unpack(cmd_and_args) + local rst, err_code, err_msg = self[cmd](self, unpack(cmd_args or {})) + if err_code ~= nil then + self['discard'](self) + return nil, err_code, err_msg + end + + if rst ~= 'QUEUED' then + self['discard'](self) + return nil, 'RunRedisCMDError', cmd .. ' no reply with the string QUEUED' + end + end + + local multi_rst, err_code, err_msg = self['exec'](self) + if err_code ~= nil then + self['discard'](self) + return nil, err_code, err_msg + end + + return multi_rst +end + +setmetatable(_M, {__index = function(_, cmd) + local method = function (self, ...) + local val, err_code, err_msg + + for ii = 1, self.retry_count, 1 do + val, err_code, err_msg = run_redis_cmd(self, cmd, ...) + if err_code == nil then + return val, nil, nil + end + + ngx.log(ngx.WARN, to_str('redis retry ', ii, ' error. ', err_code, ':', err_msg)) + end + + return nil, err_code, err_msg + end + + _M[cmd] = method + return method +end}) + +return _M From 1325fc463b4a199c75b0864134cab2901a5579b9 Mon Sep 17 00:00:00 2001 From: shuwen5 Date: Wed, 16 May 2018 14:41:30 +0800 Subject: [PATCH 05/11] add redis_chash.lua --- lib/acid/redis_chash.lua | 237 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 237 insertions(+) create mode 100644 lib/acid/redis_chash.lua diff --git a/lib/acid/redis_chash.lua b/lib/acid/redis_chash.lua new file mode 100644 index 0000000..59df62c --- /dev/null +++ b/lib/acid/redis_chash.lua @@ -0,0 +1,237 @@ +local strutil = require("acid.strutil") +local tableutil = require("acid.tableutil") +local acid_redis = require("acid.redis") +local ngx_timer = require("acid.ngx_timer") +local acid_chash = require( "acid.chash" ) +local semaphore = require("ngx.semaphore") + +local to_str = strutil.to_str +local str_split = strutil.split + +local _M = {} +local mt = { __index = _M } + +--redis_conf = { + --['cluster_redis'] = { + --expires = 0, + --updates = 0, + --semaphore = semaphore.new(1), + --servers = {}, + --chash = nil, + --get_redis_servers = get_redis_servers, + --} +--} +-- +local redis_conf = {} +local servers_expires_seconds = 600 +local servers_updates_seconds = 300 + +local function update_chash(self) + local conf = self.conf + + local _, err = conf.semaphore:wait(1) + if err then + ngx.log(ngx.INFO, err, " other timer is updating servers") + return + end + + local now = ngx.time() + if now < conf.updates then + ngx.log(ngx.INFO, "servers were updated by other timer") + + conf.semaphore:post(1) + return + end + + local servers = self.conf.get_redis_servers() + if servers == nil then + conf.semaphore:post(1) + return + end + + local now = ngx.time() + if conf.chash == nil then + conf.chash = acid_chash.new(servers) + else + conf.chash:update_server(servers) + end + + conf.servers = servers + + conf.expires = now + servers_expires_seconds + conf.updates = now + servers_updates_seconds + + ngx.log(ngx.INFO, "consistent hash built") + conf.semaphore:post(1) +end + +local function get_chash(self) + local conf = self.conf + local now = ngx.time() + + if now >= conf.updates then + if conf.updates == 0 then + update_chash(self) + else + ngx_timer.at(0.01, update_chash, self) + end + end + + if now < conf.expires then + return conf.chash + else + ngx.log(ngx.ERR, "consistent servers was expired") + end + + return nil +end + +local function optimize_choose_servers(addrs, least) + return addrs +end + +local function get_redis_addrs(self, k, n, least) + local chash = get_chash(self) + if chash == nil then + return {} + end + + local addrs, err_code, err_msg = chash:choose_server(k, {nr_choose=n}) + if err_code ~= nil then + return nil, err_code, err_msg + end + + return self.conf.optimize_choose_servers(addrs, least) +end + +local function run_cmd_on_redis(ip, port, cmd, cmd_args, pexpire) + local r_opts = { + retry_count = 1, + tiemout = 1000, + keepalive_timeout = 10 * 1000, + keepalive_size = 64, + min_log_time = 0, + } + + local redis_cli = acid_redis:new(ip, port, r_opts) + + if pexpire ~= nil and cmd == 'hset' then + local cmd_and_args = { + {cmd, cmd_args}, + {'pexpire', {cmd_args[1], pexpire}}, + } + + local multi_rst, err_code, err_msg = redis_cli:transaction(cmd_and_args) + if err_code ~= nil then + return nil, err_code, err_msg + end + + if (tonumber(multi_rst[1]) ~= 1 or tonumber(multi_rst[1]) ~= 0) + and tonumber(multi_rst[2]) ~= 1 then + return nil, 'RunRedisCMDError', 'transaction HSET error' + end + + return + end + + if pexpire ~= nil then + cmd_args = tableutil.dup(cmd_args, true) + table.insert(cmd_args, 'PX') + table.insert(cmd_args, pexpire) + end + + return redis_cli[cmd](redis_cli, unpack(cmd_args)) +end + +local function run_xget_cmd(self, cmd, cmd_args, n, r) + local addrs, err_code, err_msg = get_redis_addrs(self, cmd_args[1], n, r) + if err_code ~= nil then + return nil, err_code, err_msg + end + + for nread, addr in ipairs(addrs) do + local ipport = str_split(addr, ':') + + local val, err_code, err_msg = run_cmd_on_redis(ipport[1], ipport[2], cmd, cmd_args) + if err_code ~= nil then + ngx.log(ngx.ERR, to_str(cmd, ' value to ', addr, + 'error. err_code=', err_code, ', err_msg=', err_msg)) + end + + if val ~= nil and val ~= ngx.null then + return {value=val, addr=addr} + end + + if nread > r then + break + end + end + + return nil, 'NotFound', to_str('cmd=', cmd, ', args=', cmd_args) +end + +local function run_xset_cmd(self, cmd, cmd_args, n, w, pexpire) + local addrs, err_code, err_msg = get_redis_addrs(self, cmd_args[1], n, w) + if err_code ~= nil then + return nil, err_code, err_msg + end + + local nok = 0 + for _, addr in ipairs(addrs) do + local ipport = str_split(addr, ':') + + local _, err_code, err_msg = + run_cmd_on_redis(ipport[1], ipport[2], cmd, cmd_args, pexpire) + if err_code == nil then + nok = nok + 1 + else + ngx.log(ngx.ERR, to_str(cmd, ' value to ', addr, + 'error. err_code=', err_code, ', err_msg=', err_msg)) + end + end + + if nok < w then + return nil, "QuorumNotEnough", to_str('w=', w, ', ok=', nok) + end +end + +function _M.hget(self, args, n, r) + return run_xget_cmd(self, 'hget', args, n, r) +end + +function _M.hset(self, args, n, w, expire) + return run_xset_cmd(self, 'hset', args, n, w, expire) +end + +function _M.get(self, args, n, r) + return run_xget_cmd(self, 'get', args, n, r) +end + +function _M.set(self, args, n, w, expires) + return run_xset_cmd(self, 'set', args, n, w, expires) +end + +function _M.new( _, name, get_redis_servers, opts) + local opts = opts or {} + + if redis_conf[name] == nil then + redis_conf[name] = { + expires = 0, + updates = 0, + semaphore = semaphore.new(1), + servers = {}, + chash = nil, + get_redis_servers = get_redis_servers, + optimize_choose_servers = + opts.optimize_choose_servers or optimize_choose_servers, + } + end + + local obj = { + conf = redis_conf[name], + } + + return setmetatable( obj, mt ) +end + +return _M From 0a1d4e2bb681d696ca3af50f4b0888438a735951 Mon Sep 17 00:00:00 2001 From: shuwen5 Date: Wed, 16 May 2018 14:41:02 +0800 Subject: [PATCH 06/11] add redis_proxy.lua --- lib/acid/redis_proxy.lua | 214 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 214 insertions(+) create mode 100644 lib/acid/redis_proxy.lua diff --git a/lib/acid/redis_proxy.lua b/lib/acid/redis_proxy.lua new file mode 100644 index 0000000..5ef0952 --- /dev/null +++ b/lib/acid/redis_proxy.lua @@ -0,0 +1,214 @@ +local acid_json = require("acid.json") +local strutil = require("acid.strutil") +local tableutil = require("acid.tableutil") +local redis_chash = require("acid.redis_chash") +local aws_authenticator = require("resty.awsauth.aws_authenticator") + +local _M = {} +local mt = { __index = _M } + +local to_str = strutil.to_str + +local ERR_CODE = { + NotFound = ngx.HTTP_NOT_FOUND, + InvalidRequest = ngx.HTTP_BAD_REQUEST, + InvalidCommand = ngx.HTTP_FORBIDDEN, + QuorumNotEnough = ngx.HTTP_SERVICE_UNAVAILABLE, + RequestForbidden = ngx.HTTP_FORBIDDEN, + InvalidSignature = ngx.HTTP_FORBIDDEN, +} + +-- http method, redis opeartion, count of args, need value args, optional args name +local redis_cmd_model = { + -- get(key) + GET = {'GET', 'get', 1, false, {}}, + + -- set(key, val, expire=nil) + SET = {'PUT', 'set', 2, true, {'expire'}}, + + -- hget(hashname, hashkey) + HGET = {'GET', 'hget', 2, false, {}}, + + -- hset(hashname, hashkey, val, expire=nil) + HSET = {'PUT', 'hset', 3, true, {'expire'}}, +} + +local redis_cmd_names = tableutil.keys(redis_cmd_model) + +local function get_secret_key(access_key, secret_key) + return function(ctx) + if ctx.access_key ~= access_key then + return nil, 'InvalidAccessKey', 'access key does not exists: ' .. ctx.access_key + end + + return secret_key + end +end + +local function check_auth(self) + if ngx.var.server_addr == '127.0.0.1' + or self.access_key == nil + or self.secret_key == nil then + return + end + + local authenticator = aws_authenticator.new(self.get_secret_key) + local ctx, err_code, err_msg = authenticator:authenticate() + if err_code ~= nil then + ngx.log(ngx.INFO, err_code, ':', err_msg) + return nil, 'InvalidSignature', 'signature is not correct' + end + + if ctx.anonymous == true then + return nil, 'RequestForbidden', 'anonymous user are not allowed' + end +end + +local function output(rst, err_code, err_msg) + local status, body, headers = 200, '', {} + + local request_id = ngx.var.requestid + + if err_code ~= nil then + ngx.log( ngx.WARN, "requestid: ", request_id, + " err_code: ", err_code, " err_msg: ", err_msg ) + + status = ERR_CODE[err_code] or ngx.HTTP_BAD_REQUEST + + headers["Content-Type"] = "application/json" + + local Error = { + Code = err_code, + Message = err_msg, + RequestId = request_id, + } + body = acid_json.enc( Error ) + else + rst = rst or {} + + headers['X-REDIS-ADDR'] = rst.addr + body = rst.value or '' + end + + ngx.header["request-id"] = ngx.var.requestid + headers['Content-Length'] = #body + + ngx.status = status + + for k, v in pairs(headers) do + ngx.header[k] = v + end + + ngx.say(body) + ngx.eof() + ngx.exit(ngx.HTTP_OK) +end + +local function read_cmd_value() + local headers = ngx.req.get_headers() + local content_length = tonumber(headers['content-length']) + + if content_length == nil then + return nil, 'InvalidRequest', 'Content-Length is nil' + elseif content_length == 0 then + return '' + end + + ngx.req.read_body() + local value = ngx.req.get_body_data() + if value == nil then + return nil, 'InvalidRequest', 'Invalid request body' + end + + return value +end + +local function get_cmd_args() + --local uri_ptr = '^/redisproxy/v\\d+/(\\S+)/(\\S+)$' + local uri_ptr = '^/redisproxy/v\\d+/(\\S+?)/(\\S+)$' + local urilist = ngx.re.match(ngx.var.uri, uri_ptr, 'o') + if urilist == nil then + return nil, 'InvalidRequest', 'uri must like:'.. uri_ptr + end + + local cmd = urilist[1] + local cmd_args = strutil.split(urilist[2], '/') + + local cmd_model = redis_cmd_model[cmd] + if cmd_model == nil then + return nil, 'InvalidCommand', to_str('just support: ', redis_cmd_names) + end + + local http_method, cmd, nargs, needed_value, _ = unpack(cmd_model) + + if http_method ~= ngx.var.request_method then + return nil, 'InvalidRequest', + to_str(cmd, ' cmd request method must be ', http_method) + end + + if needed_value then + local cmd_val, err_code, err_msg = read_cmd_value() + if err_code ~= nil then + return nil, err_code, err_msg + end + + table.insert(cmd_args, cmd_val) + end + + if #(cmd_args) ~= nargs then + return nil, 'InvalidCommand', to_str(cmd, ' need ', nargs, ' args') + end + + local qs = ngx.req.get_uri_args() + + return { + cmd = cmd, + cmd_args = cmd_args, + expire = tonumber(qs.expire), + n = tonumber(qs.n) or 1, + w = tonumber(qs.w) or 1, + r = tonumber(qs.r) or 1, + } +end + +function _M.new(_, access_key, secret_key, get_redis_servers, opts) + local obj = { + access_key = access_key, + secret_key = secret_key, + get_secret_key = get_secret_key(access_key, secret_key), + redis_chash = redis_chash:new( + "cluster_redisproxy", get_redis_servers, opts) + } + + return setmetatable( obj, mt ) +end + +function _M.proxy(self) + local _, err_code, err_msg = check_auth(self) + if err_code ~= nil then + return output(nil, err_code, err_msg) + end + + local args, err_code, err_msg = get_cmd_args() + if err_code ~= nil then + return output(nil, err_code, err_msg) + end + + local cmd, cmd_args, expire = args.cmd, args.cmd_args, args.expire + local n, w, r = args.n, args.w, args.r + + local rst, err_code, err_msg + if cmd == 'hget' then + rst, err_code, err_msg = self.redis_chash:hget(cmd_args, n, r) + elseif cmd == 'hset' then + rst, err_code, err_msg = self.redis_chash:hset(cmd_args, n, w, expire) + elseif cmd == 'get' then + rst, err_code, err_msg = self.redis_chash:get(cmd_args, n, r) + elseif cmd == 'set' then + rst, err_code, err_msg = self.redis_chash:set(cmd_args, n, w, expire) + end + + return output(rst, err_code, err_msg) +end + +return _M From a61725f51afbb484245f7c8c5a8adada18a46172 Mon Sep 17 00:00:00 2001 From: shuwen5 Date: Thu, 17 May 2018 20:03:31 +0800 Subject: [PATCH 07/11] fix redis.lua --- lib/acid/redis.lua | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/lib/acid/redis.lua b/lib/acid/redis.lua index e3900e1..593127a 100644 --- a/lib/acid/redis.lua +++ b/lib/acid/redis.lua @@ -4,13 +4,13 @@ local rpc_logging = require("acid.rpc_logging") local to_str = strutil.to_str -local _M = {} +local _M = { _VERSION = "0.1" } local mt = { __index = _M } local function get_redis_cli(self) local redis_cli, err_msg = resty_redis:new() if redis_cli == nil then - return nil, nil, 'NewRedisError', err_msg + return nil, 'NewRedisError', err_msg end redis_cli:set_timeout(self.timeout) @@ -99,19 +99,19 @@ function _M.transaction(self, cmds) local cmd, cmd_args = unpack(cmd_and_args) local rst, err_code, err_msg = self[cmd](self, unpack(cmd_args or {})) if err_code ~= nil then - self['discard'](self) + self:discard() return nil, err_code, err_msg end if rst ~= 'QUEUED' then - self['discard'](self) + self:discard() return nil, 'RunRedisCMDError', cmd .. ' no reply with the string QUEUED' end end - local multi_rst, err_code, err_msg = self['exec'](self) + local multi_rst, err_code, err_msg = self:exec() if err_code ~= nil then - self['discard'](self) + self:discard() return nil, err_code, err_msg end From acf65e4e05eace47af1a57a79f6b7c36ef945ff2 Mon Sep 17 00:00:00 2001 From: shuwen5 Date: Thu, 17 May 2018 20:57:43 +0800 Subject: [PATCH 08/11] fix redis_chash.lua --- lib/acid/redis_chash.lua | 54 ++++++++++++++++++---------------------- 1 file changed, 24 insertions(+), 30 deletions(-) diff --git a/lib/acid/redis_chash.lua b/lib/acid/redis_chash.lua index 59df62c..e833a8d 100644 --- a/lib/acid/redis_chash.lua +++ b/lib/acid/redis_chash.lua @@ -8,7 +8,7 @@ local semaphore = require("ngx.semaphore") local to_str = strutil.to_str local str_split = strutil.split -local _M = {} +local _M = { _VERSION = "0.1" } local mt = { __index = _M } --redis_conf = { @@ -86,11 +86,7 @@ local function get_chash(self) return nil end -local function optimize_choose_servers(addrs, least) - return addrs -end - -local function get_redis_addrs(self, k, n, least) +local function get_redis_addrs(self, k, n) local chash = get_chash(self) if chash == nil then return {} @@ -101,7 +97,11 @@ local function get_redis_addrs(self, k, n, least) return nil, err_code, err_msg end - return self.conf.optimize_choose_servers(addrs, least) + if self.conf.optimize_choose_servers ~= nil then + return self.conf.optimize_choose_servers(addrs) + end + + return addrs end local function run_cmd_on_redis(ip, port, cmd, cmd_args, pexpire) @@ -128,7 +128,8 @@ local function run_cmd_on_redis(ip, port, cmd, cmd_args, pexpire) if (tonumber(multi_rst[1]) ~= 1 or tonumber(multi_rst[1]) ~= 0) and tonumber(multi_rst[2]) ~= 1 then - return nil, 'RunRedisCMDError', 'transaction HSET error' + ngx.log(ngx.INFO, to_str('transaction runs hset cmd result: ', multi_rst)) + return nil, 'RunRedisCMDError', 'transaction runs hset cmd result error' end return @@ -143,13 +144,13 @@ local function run_cmd_on_redis(ip, port, cmd, cmd_args, pexpire) return redis_cli[cmd](redis_cli, unpack(cmd_args)) end -local function run_xget_cmd(self, cmd, cmd_args, n, r) - local addrs, err_code, err_msg = get_redis_addrs(self, cmd_args[1], n, r) +local function run_xget_cmd(self, cmd, cmd_args, n) + local addrs, err_code, err_msg = get_redis_addrs(self, cmd_args[1], n) if err_code ~= nil then return nil, err_code, err_msg end - for nread, addr in ipairs(addrs) do + for _, addr in ipairs(addrs) do local ipport = str_split(addr, ':') local val, err_code, err_msg = run_cmd_on_redis(ipport[1], ipport[2], cmd, cmd_args) @@ -161,17 +162,13 @@ local function run_xget_cmd(self, cmd, cmd_args, n, r) if val ~= nil and val ~= ngx.null then return {value=val, addr=addr} end - - if nread > r then - break - end end return nil, 'NotFound', to_str('cmd=', cmd, ', args=', cmd_args) end -local function run_xset_cmd(self, cmd, cmd_args, n, w, pexpire) - local addrs, err_code, err_msg = get_redis_addrs(self, cmd_args[1], n, w) +local function run_xset_cmd(self, cmd, cmd_args, n, pexpire) + local addrs, err_code, err_msg = get_redis_addrs(self, cmd_args[1], n) if err_code ~= nil then return nil, err_code, err_msg end @@ -190,25 +187,23 @@ local function run_xset_cmd(self, cmd, cmd_args, n, w, pexpire) end end - if nok < w then - return nil, "QuorumNotEnough", to_str('w=', w, ', ok=', nok) - end + return nok end -function _M.hget(self, args, n, r) - return run_xget_cmd(self, 'hget', args, n, r) +function _M.hget(self, args, n) + return run_xget_cmd(self, 'hget', args, n) end -function _M.hset(self, args, n, w, expire) - return run_xset_cmd(self, 'hset', args, n, w, expire) +function _M.hset(self, args, n, expire) + return run_xset_cmd(self, 'hset', args, n, expire) end -function _M.get(self, args, n, r) - return run_xget_cmd(self, 'get', args, n, r) +function _M.get(self, args, n) + return run_xget_cmd(self, 'get', args, n) end -function _M.set(self, args, n, w, expires) - return run_xset_cmd(self, 'set', args, n, w, expires) +function _M.set(self, args, n, expires) + return run_xset_cmd(self, 'set', args, n, expires) end function _M.new( _, name, get_redis_servers, opts) @@ -222,8 +217,7 @@ function _M.new( _, name, get_redis_servers, opts) servers = {}, chash = nil, get_redis_servers = get_redis_servers, - optimize_choose_servers = - opts.optimize_choose_servers or optimize_choose_servers, + optimize_choose_servers = opts.optimize_choose_servers } end From c76cddd35537abb6b5c89b0935e5f55635b6cbf8 Mon Sep 17 00:00:00 2001 From: shuwen5 Date: Thu, 17 May 2018 20:58:29 +0800 Subject: [PATCH 09/11] fix redis_proxy.lua --- lib/acid/redis_proxy.lua | 40 +++++++++++++++++++++------------------- 1 file changed, 21 insertions(+), 19 deletions(-) diff --git a/lib/acid/redis_proxy.lua b/lib/acid/redis_proxy.lua index 5ef0952..30c7499 100644 --- a/lib/acid/redis_proxy.lua +++ b/lib/acid/redis_proxy.lua @@ -1,10 +1,11 @@ local acid_json = require("acid.json") +local acid_nwr = require("acid.nwr") local strutil = require("acid.strutil") local tableutil = require("acid.tableutil") local redis_chash = require("acid.redis_chash") local aws_authenticator = require("resty.awsauth.aws_authenticator") -local _M = {} +local _M = { _VERSION = "0.1" } local mt = { __index = _M } local to_str = strutil.to_str @@ -124,11 +125,10 @@ local function read_cmd_value() end local function get_cmd_args() - --local uri_ptr = '^/redisproxy/v\\d+/(\\S+)/(\\S+)$' - local uri_ptr = '^/redisproxy/v\\d+/(\\S+?)/(\\S+)$' - local urilist = ngx.re.match(ngx.var.uri, uri_ptr, 'o') + local uri_regex = '^/redisproxy/v\\d+/(\\S+?)/(\\S+)$' + local urilist = ngx.re.match(ngx.var.uri, uri_regex, 'o') if urilist == nil then - return nil, 'InvalidRequest', 'uri must like:'.. uri_ptr + return nil, 'InvalidRequest', 'uri must like:'.. uri_regex end local cmd = urilist[1] @@ -165,9 +165,11 @@ local function get_cmd_args() cmd = cmd, cmd_args = cmd_args, expire = tonumber(qs.expire), - n = tonumber(qs.n) or 1, - w = tonumber(qs.w) or 1, - r = tonumber(qs.r) or 1, + nwr = { + tonumber(qs.n) or 1, + tonumber(qs.w) or 1, + tonumber(qs.r) or 1, + }, } end @@ -194,18 +196,18 @@ function _M.proxy(self) return output(nil, err_code, err_msg) end - local cmd, cmd_args, expire = args.cmd, args.cmd_args, args.expire - local n, w, r = args.n, args.w, args.r + local cmd, cmd_args, nwr, expire = + args.cmd, args.cmd_args, args.nwr, args.expire - local rst, err_code, err_msg - if cmd == 'hget' then - rst, err_code, err_msg = self.redis_chash:hget(cmd_args, n, r) - elseif cmd == 'hset' then - rst, err_code, err_msg = self.redis_chash:hset(cmd_args, n, w, expire) - elseif cmd == 'get' then - rst, err_code, err_msg = self.redis_chash:get(cmd_args, n, r) - elseif cmd == 'set' then - rst, err_code, err_msg = self.redis_chash:set(cmd_args, n, w, expire) + local nok, rst, err_code, err_msg + if cmd == 'set' or cmd == 'hset' then + nok, err_code, err_msg = + self.redis_chash[cmd](self.redis_chash, cmd_args, nwr[1], expire) + if err_code == nil then + _, err_code, err_msg = acid_nwr.assert_w_ok(nwr, nok) + end + else + rst, err_code, err_msg = self.redis_chash[cmd](self.redis_chash, cmd_args, nwr[3]) end return output(rst, err_code, err_msg) From bfade980690d0eb01ffc859cd2d499e6ebee4e9b Mon Sep 17 00:00:00 2001 From: shuwen5 Date: Fri, 18 May 2018 17:37:29 +0800 Subject: [PATCH 10/11] add nwr.lua --- lib/acid/nwr.lua | 15 +++++++++++++++ 1 file changed, 15 insertions(+) create mode 100644 lib/acid/nwr.lua diff --git a/lib/acid/nwr.lua b/lib/acid/nwr.lua new file mode 100644 index 0000000..a5bd428 --- /dev/null +++ b/lib/acid/nwr.lua @@ -0,0 +1,15 @@ +local strutil = require("acid.strutil") + +local _M = { _VERSION = "0.1" } + +local to_str = strutil.to_str + +function _M.assert_w_ok(nwr, wok) + if wok >= nwr[2] then + return true + end + + return nil, "QuorumNotEnough", to_str('nwr=', nwr, ', wok=', wok) +end + +return _M From 9c8299716bbbe775ad7700ab731149bdc2cf14d8 Mon Sep 17 00:00:00 2001 From: shuwen5 Date: Wed, 23 May 2018 17:31:13 +0800 Subject: [PATCH 11/11] simplify redis_cmd_model --- lib/acid/redis_proxy.lua | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/lib/acid/redis_proxy.lua b/lib/acid/redis_proxy.lua index 30c7499..bea75ab 100644 --- a/lib/acid/redis_proxy.lua +++ b/lib/acid/redis_proxy.lua @@ -19,19 +19,19 @@ local ERR_CODE = { InvalidSignature = ngx.HTTP_FORBIDDEN, } --- http method, redis opeartion, count of args, need value args, optional args name +-- http method, count of args, need value args, optional args name local redis_cmd_model = { -- get(key) - GET = {'GET', 'get', 1, false, {}}, + GET = {'GET', 1, false, {}}, -- set(key, val, expire=nil) - SET = {'PUT', 'set', 2, true, {'expire'}}, + SET = {'PUT', 2, true, {'expire'}}, -- hget(hashname, hashkey) - HGET = {'GET', 'hget', 2, false, {}}, + HGET = {'GET', 2, false, {}}, -- hset(hashname, hashkey, val, expire=nil) - HSET = {'PUT', 'hset', 3, true, {'expire'}}, + HSET = {'PUT', 3, true, {'expire'}}, } local redis_cmd_names = tableutil.keys(redis_cmd_model) @@ -139,7 +139,7 @@ local function get_cmd_args() return nil, 'InvalidCommand', to_str('just support: ', redis_cmd_names) end - local http_method, cmd, nargs, needed_value, _ = unpack(cmd_model) + local http_method, nargs, needed_value, _ = unpack(cmd_model) if http_method ~= ngx.var.request_method then return nil, 'InvalidRequest', @@ -162,7 +162,7 @@ local function get_cmd_args() local qs = ngx.req.get_uri_args() return { - cmd = cmd, + cmd = string.lower(cmd), cmd_args = cmd_args, expire = tonumber(qs.expire), nwr = {