diff --git a/.gitignore b/.gitignore index 014e7e3e..2556202e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ +# pyc +*.pyc + # Compiled Object files *.lo *.o diff --git a/.travis.yml b/.travis.yml index b0de971d..254d91a0 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,3 +1,3 @@ language: c -script: CFLAGS="-ggdb3 -O0" autoreconf -fvi && ./configure --enable-debug=log && make && sudo make install +script: bash ./travis.sh diff --git a/notes/redis.md b/notes/redis.md index f969eb83..1ecb1e83 100644 --- a/notes/redis.md +++ b/notes/redis.md @@ -79,9 +79,7 @@ +-------------------+------------+---------------------------------------------------------------------------------------------------------------------+ | MGET | Yes | MGET key [key ...] | +-------------------+------------+---------------------------------------------------------------------------------------------------------------------+ - | MSET | No | MSET key value [key value ...] | - +-------------------+------------+---------------------------------------------------------------------------------------------------------------------+ - | MSETNX | No | MSETNX key value [key value ...] | + | MSET | Yes* | MSET key value [key value ...] | +-------------------+------------+---------------------------------------------------------------------------------------------------------------------+ | PSETEX | Yes | PSETEX key milliseconds value | +-------------------+------------+---------------------------------------------------------------------------------------------------------------------+ @@ -98,6 +96,8 @@ | STRLEN | Yes | STRLEN key | +-------------------+------------+---------------------------------------------------------------------------------------------------------------------+ +* MSET support is not Atomic + ### Hashes +-------------------+------------+---------------------------------------------------------------------------------------------------------------------+ diff --git a/scripts/benchmark-mget.py b/scripts/benchmark-mget.py new file mode 100644 index 00000000..4fbcb6b3 --- /dev/null +++ b/scripts/benchmark-mget.py @@ -0,0 +1,43 @@ +#!/usr/bin/env python +#coding: utf-8 +#file : test_mget.py +#author : ning +#date : 2014-04-01 13:15:48 + +import os +import re +import commands + +ports = [ + 4001, # before improve + 4000, # after improve + 2000 # redis +] + +def system(cmd): + return commands.getoutput(cmd) + +def extra(regex, text): + match = re.search(regex, text, re.DOTALL) + if match: + return match.group(1) + +def testit(): + for mget_size in [10, 100, 1000, 10000]: + for port in ports: + cnt = 100*1000 / mget_size + clients = 50 + if mget_size == 10000: + clients = 2 + cmd = 'cd /home/ning/xredis/deploy-srcs/redis-2.8.3/src && ./redis-benchmark.%d -n %d -p %d -t mget -r 1000000000 -c %d' % (mget_size, cnt, port, clients) + #print cmd + rst = system(cmd) + + #100.00% <= 2 milliseconds + #28089.89 requests per second + rtime = extra('100.00% <= (\d+) milliseconds', rst) + qps = extra('([\.\d]+) requests per second', rst) + + print 'mget_size=%d on %d: pqs: %s, rtime: %s' % (mget_size, port, qps, rtime) + +testit() diff --git a/scripts/test_mget_mset.py b/scripts/test_mget_mset.py new file mode 100644 index 00000000..f361a479 --- /dev/null +++ b/scripts/test_mget_mset.py @@ -0,0 +1,79 @@ +#!/usr/bin/env python +#coding: utf-8 +#file : test_mget.py +#author : ning +#date : 2014-04-01 13:15:48 + +''' +usage: + +export REDIS_HOST=127.0.0.1 +export REDIS_PORT=4000 +nosetests + +''' + +import os +import redis + +host = os.environ['REDIS_HOST'] +port = int(os.environ['REDIS_PORT']) + + +def test_mget(cnt=10): + print 'test_many', cnt + r = redis.StrictRedis(host, port) + + def insert_by_pipeline(): + pipe = r.pipeline(transaction=False) + for i in range(cnt): + pipe.set('kkk-%s'%i, 'vvv-%s'%i) + pipe.execute() + + def insert_by_mset(): + kv = {'kkk-%s' % i :'vvv-%s' % i for i in range(cnt)} + ret = r.mset(**kv) + + insert_by_mset() + keys = ['kkk-%s' % i for i in range(cnt)] + + #mget to check + vals = r.mget(keys) + #print 'vals', vals + for i in range(cnt): + assert('vvv-%s'%i == vals[i]) + + #del + assert (cnt == r.delete(*keys) ) + + #mget again + vals = r.mget(keys) + for i in range(cnt): + assert(None == vals[i]) + +def test_many_mget(): + for i in range(1, 10000, 17): + test_mget(i) + pass + +def test_large_mget(cnt=5): + r = redis.StrictRedis(host, port) + + kv = {} + for i in range(cnt): + kv['kkx-%s' % i] = os.urandom(1024*1024*8) + + #insert + for i in range(cnt): + key = 'kkx-%s' % i + r.set(key, kv[key]) + + keys = ['kkx-%s' % i for i in range(cnt)] + + #mget to check + vals = r.mget(keys) + for i in range(cnt): + key = 'kkx-%s' % i + assert(kv[key] == vals[i]) + + diff --git a/src/nc_core.c b/src/nc_core.c index ee3beeca..0ddd4b00 100644 --- a/src/nc_core.c +++ b/src/nc_core.c @@ -172,9 +172,9 @@ core_send(struct context *ctx, struct conn *conn) status = conn->send(ctx, conn); if (status != NC_OK) { - log_debug(LOG_INFO, "send on %c %d failed: %s", + log_debug(LOG_INFO, "send on %c %d failed: status:%d errno: %d %s", conn->client ? 'c' : (conn->proxy ? 'p' : 's'), conn->sd, - strerror(errno)); + status, errno, strerror(errno)); } return status; diff --git a/src/nc_log.c b/src/nc_log.c index fa0ae543..4111c9fa 100644 --- a/src/nc_log.c +++ b/src/nc_log.c @@ -250,5 +250,12 @@ _log_hexdump(const char *file, int line, char *data, int datalen, l->nerror++; } + if (len >= size - 1){ + n = nc_write(l->fd, "\n", 1); + if (n < 0) { + l->nerror++; + } + } + errno = errno_save; } diff --git a/src/nc_log.h b/src/nc_log.h index 1b60c079..b05c2029 100644 --- a/src/nc_log.h +++ b/src/nc_log.h @@ -38,7 +38,7 @@ struct logger { #define LOG_VVVERB 10 /* verbose messages on ganga */ #define LOG_PVERB 11 /* periodic verbose messages on crack */ -#define LOG_MAX_LEN 256 /* max length of log message */ +#define LOG_MAX_LEN 512 /* max length of log message */ /* * log_stderr - log to stderr diff --git a/src/nc_message.c b/src/nc_message.c index 654cdf9a..139305a4 100644 --- a/src/nc_message.c +++ b/src/nc_message.c @@ -17,6 +17,7 @@ #include #include +#include #include @@ -234,7 +235,9 @@ _msg_get(void) msg->end = NULL; msg->frag_owner = NULL; + msg->frag_seq = NULL; msg->nfrag = 0; + msg->nfrag_done = 0; msg->frag_id = 0; msg->narg_start = NULL; @@ -361,10 +364,14 @@ msg_put(struct msg *msg) } void -msg_dump(struct msg *msg) +msg_dump(struct msg *msg, int level) { struct mbuf *mbuf; + if (log_loggable(level) == 0) { + return; + } + loga("msg dump id %"PRIu64" request %d len %"PRIu32" type %d done %d " "error %d (err %d)", msg->id, msg->request, msg->mlen, msg->type, msg->done, msg->error, msg->err); @@ -377,7 +384,16 @@ msg_dump(struct msg *msg) q = mbuf->last; len = q - p; - loga_hexdump(p, len, "mbuf with %ld bytes of data", len); + loga_hexdump(p, len, "mbuf [%p] with %ld bytes of data", p, len); + } +} + +void +msg_reset_mbufs(struct msg *msg) +{ + struct mbuf *mbuf; + STAILQ_FOREACH(mbuf, &msg->mhdr, next) { + mbuf->pos = mbuf->last = mbuf->start; } } @@ -412,43 +428,507 @@ msg_empty(struct msg *msg) return msg->mlen == 0 ? true : false; } -static rstatus_t -msg_parsed(struct context *ctx, struct conn *conn, struct msg *msg) +static struct mbuf * +get_mbuf(struct msg *msg) { - struct msg *nmsg; - struct mbuf *mbuf, *nbuf; + struct mbuf *mbuf; mbuf = STAILQ_LAST(&msg->mhdr, mbuf, next); - if (msg->pos == mbuf->last) { - /* no more data to parse */ - conn->recv_done(ctx, conn, msg, NULL); - return NC_OK; + if (mbuf == NULL || mbuf_full(mbuf)) { + mbuf = mbuf_get(); + if (mbuf == NULL) { + return NULL; + } + + mbuf_insert(&msg->mhdr, mbuf); + msg->pos = mbuf->pos; } + ASSERT(mbuf->end - mbuf->last > 0); + return mbuf; +} - /* - * Input mbuf has un-parsed data. Split mbuf of the current message msg - * into (mbuf, nbuf), where mbuf is the portion of the message that has - * been parsed and nbuf is the portion of the message that is un-parsed. - * Parse nbuf as a new message nmsg in the next iteration. - */ - nbuf = mbuf_split(&msg->mhdr, msg->pos, NULL, NULL); - if (nbuf == NULL) { +static void +msg_make_reply(struct context *ctx, struct conn *conn, struct msg *req) { + struct mbuf *mbuf; + struct msg *msg; + + msg = msg_get(conn, true, conn->redis); /*replay*/ + if (msg == NULL) { + conn->err = errno; + return; + } + + mbuf = get_mbuf(msg); + if (mbuf == NULL) { + msg_put(msg); + return; + } + + req->peer = msg; + msg->peer = req; + msg->request = 0; + + req->done = 1; + /*event_add_out(ctx->ep, conn);*/ + /*conn->dequeue_inq(ctx, conn, req);*/ + conn->enqueue_outq(ctx, conn, req); +} + +static uint32_t +key_to_idx(struct server_pool *pool, uint8_t *key, uint32_t keylen){ + /*hash_tag*/ + if (!string_empty(&pool->hash_tag)) { + struct string *tag = &pool->hash_tag; + uint8_t *tag_start, *tag_end; + + tag_start = nc_strchr(key, key + keylen, tag->data[0]); + if (tag_start != NULL) { + tag_end = nc_strchr(tag_start + 1, key + keylen, tag->data[1]); + if (tag_end != NULL) { + key = tag_start + 1; + keylen = (uint32_t)(tag_end - key); + } + } + } + return server_pool_idx(pool, key, keylen); +} + +/* + * parse next key in mget request, update + * r->key_start + * r->key_end + * */ +static rstatus_t +msg_fragment_argx_update_keypos(struct msg *r){ + struct mbuf *buf; + uint8_t * p; + uint32_t len = 0; + uint32_t keylen = 0; + + for(buf = STAILQ_FIRST(&r->mhdr); buf->pos >= buf->last; buf = STAILQ_FIRST(&r->mhdr)){ + mbuf_remove(&r->mhdr, buf); + mbuf_put(buf); + } + + p = buf->pos; + ASSERT(*p == '$'); + p ++; + + len = 0; + for (; p < buf->last && isdigit(*p); p++) { + len = len * 10 + (uint32_t)(*p - '0'); + } + + keylen = len; + len += (uint32_t)CRLF_LEN * 2; + len += (uint32_t)(p - buf->pos); + + if(mbuf_length(buf) < len - CRLF_LEN){ /*key no in this buf, remove it.*/ + len -= mbuf_length(buf); + mbuf_remove(&r->mhdr, buf); + mbuf_put(buf); + + buf = STAILQ_FIRST(&r->mhdr); + } + + r->key_end = buf->pos + len - CRLF_LEN; + r->key_start = r->key_end - keylen; + buf->pos += len - CRLF_LEN; + + len = CRLF_LEN; + while(mbuf_length(buf) < len){ /*eat CRLF*/ + len -= mbuf_length(buf); + buf->pos = buf->last; + + buf = STAILQ_NEXT(buf, next); + } + buf->pos += len; + + return NC_OK; +} + +static struct mbuf * +msg_ensure_mbuf(struct msg *msg, uint32_t len){ + struct mbuf *mbuf; + if (STAILQ_EMPTY(&msg->mhdr) + || mbuf_size(STAILQ_LAST(&msg->mhdr, mbuf, next)) < len ){ + mbuf = mbuf_get(); + if (mbuf == NULL) { + return NULL; + } + mbuf_insert(&msg->mhdr, mbuf); + }else{ + mbuf = STAILQ_LAST(&msg->mhdr, mbuf, next); + } + return mbuf; +} + +static rstatus_t +msg_append_key(struct msg *msg, uint8_t * key, uint32_t keylen){ + uint32_t len; + struct mbuf *mbuf; + uint8_t printbuf[32]; + + /*1. keylen*/ + len = (uint32_t)nc_snprintf(printbuf, sizeof(printbuf), "$%d\r\n", keylen); + mbuf = msg_ensure_mbuf(msg, len); + if (mbuf == NULL) { + nc_free(msg); return NC_ENOMEM; } + mbuf_copy(mbuf, printbuf, len); + msg->mlen += len; - nmsg = msg_get(msg->owner, msg->request, conn->redis); - if (nmsg == NULL) { - mbuf_put(nbuf); + /*2. key*/ + mbuf = msg_ensure_mbuf(msg, keylen); + if (mbuf == NULL) { + nc_free(msg); return NC_ENOMEM; } - mbuf_insert(&nmsg->mhdr, nbuf); - nmsg->pos = nbuf->pos; + msg->key_start = mbuf->last; /*update key_start*/ + mbuf_copy(mbuf, key, keylen); + msg->mlen += keylen; + msg->key_end = mbuf->last; /*update key_start*/ - /* update length of current (msg) and new message (nmsg) */ - nmsg->mlen = mbuf_length(nbuf); - msg->mlen -= nmsg->mlen; + /*3. CRLF*/ + mbuf = msg_ensure_mbuf(msg, CRLF_LEN); + if (mbuf == NULL) { + nc_free(msg); + return NC_ENOMEM; + } + mbuf_copy(mbuf, (uint8_t *)CRLF, CRLF_LEN); + msg->mlen += (uint32_t)CRLF_LEN; + return NC_OK; +} - conn->recv_done(ctx, conn, msg, nmsg); + +static rstatus_t +msg_fragment_argx(struct context *ctx, struct conn *conn, struct msg *msg, struct msg *nmsg, uint32_t key_step){ + struct server_pool *pool; + struct mbuf *mbuf, *sub_msg_mbuf; + struct msg ** sub_msgs; + uint32_t i; + + ASSERT(conn->client && !conn->proxy); + ASSERT(conn->owner != NULL); + + /*init sub_msgs and msg->frag_seq*/ + pool = conn->owner; + sub_msgs = nc_alloc(pool->ncontinuum * sizeof(void *) ); + for(i = 0; i < pool->ncontinuum; i++){ + sub_msgs[i] = msg_get(msg->owner, msg->request, conn->redis); + } + msg->frag_seq = nc_alloc(sizeof(struct msg *) * msg->narg); /*the point for each key, point to sub_msgs elements*/ + + mbuf = STAILQ_FIRST(&msg->mhdr); + mbuf->pos = mbuf->start; + + for(i = 0; i< 3; i++){ /*eat *narg\r\n$4\r\nMGET\r\n*/ + for(;*(mbuf->pos) != '\n';){ + mbuf->pos ++; + } + mbuf->pos ++; + } + + msg->frag_id = ++frag_id; + msg->first_fragment = 1; + msg->nfrag = 0; + msg->frag_owner = msg; + + for(i = 1; i < msg->narg; i++){ /*for each key*/ + uint8_t * key; + uint32_t keylen; + uint32_t idx; + struct msg *sub_msg; + uint32_t len; + + msg_fragment_argx_update_keypos(msg); + key = msg->key_start; + keylen = (uint32_t)(msg->key_end - msg->key_start); + idx = key_to_idx(pool, key, keylen); + sub_msg = sub_msgs[idx]; + + msg->frag_seq[i] = sub_msgs[idx]; + + sub_msg->narg ++; + if (NC_OK != msg_append_key(sub_msg, key, keylen)){ + nc_free(sub_msgs); + return NC_ENOMEM; + } + + if(key_step == 1){ /*mget,del*/ + continue; + } else{ /*mset, msetex*/ + len = redis_copy_bulk(sub_msg, msg); + log_debug(LOG_VVERB, "redis_copy_bulk for mset copy bytes: %d", len); + i++; + sub_msg->narg ++; + } + } + + msg_make_reply(ctx, conn, msg) ; + + for(i = 0; i < pool->ncontinuum; i++){ /*prepend mget header, and forward it*/ + struct msg* sub_msg = sub_msgs[i]; + if(STAILQ_EMPTY(&sub_msg->mhdr)){ + msg_put(sub_msg); + continue; + } + + sub_msg_mbuf = mbuf_get(); + if (sub_msg_mbuf == NULL) { + nc_free(sub_msgs); + return NC_ENOMEM; + } + if (msg->type == MSG_REQ_REDIS_MGET){ + sub_msg_mbuf->last += nc_snprintf(sub_msg_mbuf->last, mbuf_size(sub_msg_mbuf), "*%d\r\n$4\r\nmget\r\n", + sub_msg->narg + 1); + }else if (msg->type == MSG_REQ_REDIS_DEL){ + sub_msg_mbuf->last += nc_snprintf(sub_msg_mbuf->last, mbuf_size(sub_msg_mbuf), "*%d\r\n$3\r\ndel\r\n", + sub_msg->narg + 1); + }else if (msg->type == MSG_REQ_REDIS_MSET){ + sub_msg_mbuf->last += nc_snprintf(sub_msg_mbuf->last, mbuf_size(sub_msg_mbuf), "*%d\r\n$4\r\nmset\r\n", + sub_msg->narg + 1); + } + sub_msg->mlen += mbuf_length(sub_msg_mbuf); + + sub_msg->type = msg->type; + sub_msg->frag_id = msg->frag_id; + sub_msg->frag_owner = msg->frag_owner; + STAILQ_INSERT_HEAD(&sub_msg->mhdr, sub_msg_mbuf, next); + + conn->recv_done(ctx, conn, sub_msg, nmsg); + msg->nfrag ++; + } + + nc_free(sub_msgs); + return NC_OK; +} + + + + + +/* + * parse next key in mget request, update + * r->key_start + * r->key_end + * */ +static rstatus_t +msg_fragment_retrieval_update_keypos(struct msg *r){ + struct mbuf *buf; + uint8_t * p; + + for(buf = STAILQ_FIRST(&r->mhdr); buf->pos >= buf->last; buf = STAILQ_FIRST(&r->mhdr)){ + mbuf_remove(&r->mhdr, buf); + mbuf_put(buf); + } + + p = buf->pos; + for (; p < buf->last && isspace(*p); p++) { + /*eat spaces*/ + } + + r->key_start = p; + for (; p < buf->last && !isspace(*p); p++) { + /*read key*/ + } + r->key_end = p; + + for (; p < buf->last && isspace(*p); p++) { + /*eat spaces*/ + } + buf->pos = p; + return NC_OK; +} + +static rstatus_t +msg_append_memcache_key(struct msg *msg, uint8_t * key, uint32_t keylen){ + struct mbuf *mbuf; + + mbuf = msg_ensure_mbuf(msg, keylen+2); + if (mbuf == NULL) { + nc_free(msg); + return NC_ENOMEM; + } + msg->key_start = mbuf->last; /*update key_start*/ + mbuf_copy(mbuf, key, keylen); + msg->mlen += keylen; + msg->key_end = mbuf->last; /*update key_start*/ + + mbuf_copy(mbuf, (uint8_t *)" ", 1); + msg->mlen += 1; + return NC_OK; +} + + +static rstatus_t +msg_fragment_retrieval(struct context *ctx, struct conn *conn, struct msg *msg, struct msg *nmsg){ + struct server_pool *pool; + struct mbuf *mbuf, *sub_msg_mbuf; + struct msg ** sub_msgs; + uint32_t i; + + ASSERT(conn->client && !conn->proxy); + ASSERT(conn->owner != NULL); + + /*init sub_msgs and msg->frag_seq*/ + pool = conn->owner; + sub_msgs = nc_alloc(pool->ncontinuum * sizeof(void *) ); + for(i = 0; i < pool->ncontinuum; i++){ + sub_msgs[i] = msg_get(msg->owner, msg->request, conn->redis); + } + + log_debug(LOG_VERB, "msg:%p, msg->narg:%d", msg, msg->narg); + msg->frag_seq = nc_alloc(sizeof(struct msg *) * msg->narg); /*the point for each key, point to sub_msgs elements*/ + + mbuf = STAILQ_FIRST(&msg->mhdr); + mbuf->pos = mbuf->start; + + for(;*(mbuf->pos) != ' ';){ /*eat 'get '*/ + mbuf->pos ++; + } + mbuf->pos ++; + + msg->frag_id = ++frag_id; + msg->first_fragment = 1; + msg->nfrag = 0; + msg->frag_owner = msg; + + for(i = 1; i < msg->narg; i++){ /*for each key*/ + uint8_t * key; + uint32_t keylen; + uint32_t idx; + struct msg *sub_msg; + + msg_fragment_retrieval_update_keypos(msg); + key = msg->key_start; + keylen = (uint32_t)(msg->key_end - msg->key_start); + idx = key_to_idx(pool, key, keylen); + sub_msg = sub_msgs[idx]; + + msg->frag_seq[i] = sub_msgs[idx]; + + sub_msg->narg ++; + if (NC_OK != msg_append_memcache_key(sub_msg, key, keylen)){ + nc_free(sub_msgs); + return NC_ENOMEM; + } + } + + if (STAILQ_EMPTY(&msg->mhdr)){ + mbuf = mbuf_get(); + if (mbuf == NULL) { + nc_free(sub_msgs); + return NC_ENOMEM; + } + mbuf_insert(&msg->mhdr, mbuf); + } + + /*conn->recv_done(ctx, conn, msg, nmsg);*/ + msg_make_reply(ctx, conn, msg); + + for(i = 0; i < pool->ncontinuum; i++){ /*prepend mget header, and forward it*/ + struct msg* sub_msg = sub_msgs[i]; + if(STAILQ_EMPTY(&sub_msg->mhdr)){ + msg_put(sub_msg); + continue; + } + + /*prepend get/gets (TODO: use a function)*/ + sub_msg_mbuf = mbuf_get(); + if (sub_msg_mbuf == NULL) { + nc_free(sub_msgs); + return NC_ENOMEM; + } + if (msg->type == MSG_REQ_MC_GET){ + sub_msg_mbuf->last += nc_snprintf(sub_msg_mbuf->last, mbuf_size(sub_msg_mbuf), "get "); + }else if (msg->type == MSG_REQ_MC_GETS){ + sub_msg_mbuf->last += nc_snprintf(sub_msg_mbuf->last, mbuf_size(sub_msg_mbuf), "gets "); + } + sub_msg->mlen += mbuf_length(sub_msg_mbuf); + STAILQ_INSERT_HEAD(&sub_msg->mhdr, sub_msg_mbuf, next); + + /*append \r\n*/ + sub_msg_mbuf = mbuf_get(); + if (sub_msg_mbuf == NULL) { + nc_free(sub_msgs); + return NC_ENOMEM; + } + sub_msg_mbuf->last += nc_snprintf(sub_msg_mbuf->last, mbuf_size(sub_msg_mbuf), "\r\n"); + sub_msg->mlen += mbuf_length(sub_msg_mbuf); + STAILQ_INSERT_TAIL(&sub_msg->mhdr, sub_msg_mbuf, next); + + sub_msg->type = msg->type; + sub_msg->frag_id = msg->frag_id; + sub_msg->frag_owner = msg->frag_owner; + + /*msg_dump(sub_msg, LOG_VERB);*/ + conn->recv_done(ctx, conn, sub_msg, nmsg); + msg->nfrag ++; + } + + nc_free(sub_msgs); + return NC_OK; +} + +static rstatus_t +msg_parsed(struct context *ctx, struct conn *conn, struct msg *msg) +{ + struct msg *nmsg; /*next msg*/ + struct mbuf *mbuf, *nbuf; + + mbuf = STAILQ_LAST(&msg->mhdr, mbuf, next); + if (msg->pos == mbuf->last) { + /* no more data to parse */ + nmsg = NULL; + }else{ + + /* + * Input mbuf has un-parsed data. Split mbuf of the current message msg + * into (mbuf, nbuf), where mbuf is the portion of the message that has + * been parsed and nbuf is the portion of the message that is un-parsed. + * Parse nbuf as a new message nmsg in the next iteration. + */ + nbuf = mbuf_split(&msg->mhdr, msg->pos, NULL, NULL); + if (nbuf == NULL) { + return NC_ENOMEM; + } + + nmsg = msg_get(msg->owner, msg->request, conn->redis); + if (nmsg == NULL) { + mbuf_put(nbuf); + return NC_ENOMEM; + } + mbuf_insert(&nmsg->mhdr, nbuf); + nmsg->pos = nbuf->pos; + + /* update length of current (msg) and new message (nmsg) */ + nmsg->mlen = mbuf_length(nbuf); + msg->mlen -= nmsg->mlen; + } + + if(redis_argx(msg)){ + rstatus_t status = msg_fragment_argx(ctx, conn, msg, nmsg, 1); + if (status != NC_OK) { + return status; + } + return NC_OK; + }else if(redis_arg2x(msg)){ + rstatus_t status = msg_fragment_argx(ctx, conn, msg, nmsg, 2); + if (status != NC_OK) { + return status; + } + return NC_OK; + }else if(memcache_retrieval(msg)){ + rstatus_t status = msg_fragment_retrieval(ctx, conn, msg, nmsg); + if (status != NC_OK) { + return status; + } + return NC_OK; + }else{ + conn->recv_done(ctx, conn, msg, nmsg); + } return NC_OK; } @@ -742,11 +1222,12 @@ msg_send_chain(struct context *ctx, struct conn *conn, struct msg *msg) } } - ASSERT(!TAILQ_EMPTY(&send_msgq) && nsend != 0); - conn->smsg = NULL; - - n = conn_sendv(conn, &sendv, nsend); + if(!TAILQ_EMPTY(&send_msgq) && nsend != 0){ + n = conn_sendv(conn, &sendv, nsend); + }else{ + n = 0; + } nsent = n > 0 ? (size_t)n : 0; @@ -794,7 +1275,7 @@ msg_send_chain(struct context *ctx, struct conn *conn, struct msg *msg) ASSERT(TAILQ_EMPTY(&send_msgq)); - if (n > 0) { + if (n >= 0) { return NC_OK; } diff --git a/src/nc_message.h b/src/nc_message.h index b08221f8..dd32a456 100644 --- a/src/nc_message.h +++ b/src/nc_message.h @@ -80,6 +80,7 @@ typedef enum msg_type { MSG_REQ_REDIS_INCRBY, MSG_REQ_REDIS_INCRBYFLOAT, MSG_REQ_REDIS_MGET, + MSG_REQ_REDIS_MSET, /*mset*/ MSG_REQ_REDIS_PSETEX, MSG_REQ_REDIS_RESTORE, MSG_REQ_REDIS_SET, @@ -198,7 +199,9 @@ struct msg { struct msg *frag_owner; /* owner of fragment message */ uint32_t nfrag; /* # fragment */ + uint32_t nfrag_done; /* # fragment */ uint64_t frag_id; /* id of fragmented message */ + struct msg **frag_seq; /* sequence of fragment message, one element for each mget key*/ err_t err; /* errno on error? */ unsigned error:1; /* error? */ @@ -225,7 +228,7 @@ void msg_deinit(void); struct msg *msg_get(struct conn *conn, bool request, bool redis); void msg_put(struct msg *msg); struct msg *msg_get_error(bool redis, err_t err); -void msg_dump(struct msg *msg); +void msg_dump(struct msg *msg, int level); bool msg_empty(struct msg *msg); rstatus_t msg_recv(struct context *ctx, struct conn *conn); rstatus_t msg_send(struct context *ctx, struct conn *conn); @@ -233,6 +236,8 @@ rstatus_t msg_send(struct context *ctx, struct conn *conn); struct msg *req_get(struct conn *conn); void req_put(struct msg *msg); bool req_done(struct conn *conn, struct msg *msg); +void msg_reset_mbufs(struct msg *msg); + bool req_error(struct conn *conn, struct msg *msg); void req_server_enqueue_imsgq(struct context *ctx, struct conn *conn, struct msg *msg); void req_server_dequeue_imsgq(struct context *ctx, struct conn *conn, struct msg *msg); diff --git a/src/nc_request.c b/src/nc_request.c index d89f99e1..9a11a158 100644 --- a/src/nc_request.c +++ b/src/nc_request.c @@ -84,6 +84,10 @@ req_done(struct conn *conn, struct msg *msg) return true; } + if(msg->nfrag_done < msg->nfrag){ + return false; + } + /* check all fragments of the given request vector are done */ for (pmsg = msg, cmsg = TAILQ_PREV(msg, msg_tqh, c_tqe); @@ -104,9 +108,9 @@ req_done(struct conn *conn, struct msg *msg) } } - if (!pmsg->last_fragment) { - return false; - } + /*if (!pmsg->last_fragment) {*/ + /*return false;*/ + /*}*/ /* * At this point, all the fragments including the last fragment have @@ -133,7 +137,7 @@ req_done(struct conn *conn, struct msg *msg) nfragment++; } - ASSERT(msg->frag_owner->nfrag == nfragment); + /*ASSERT(msg->frag_owner->nfrag == nfragment);*/ msg->post_coalesce(msg->frag_owner); @@ -143,6 +147,7 @@ req_done(struct conn *conn, struct msg *msg) return true; } + /* * Return true if request is in error, false otherwise * @@ -504,7 +509,7 @@ req_recv_done(struct context *ctx, struct conn *conn, struct msg *msg, ASSERT(conn->client && !conn->proxy); ASSERT(msg->request); ASSERT(msg->owner == conn); - ASSERT(conn->rmsg == msg); + /*ASSERT(conn->rmsg == msg);*/ ASSERT(nmsg == NULL || nmsg->request); /* enqueue next message (request), if any */ diff --git a/src/nc_server.c b/src/nc_server.c index 40705167..4ba7a72e 100644 --- a/src/nc_server.c +++ b/src/nc_server.c @@ -368,6 +368,10 @@ server_close(struct context *ctx, struct conn *conn) msg->error = 1; msg->err = conn->err; + if (msg->frag_owner != NULL){ + msg->frag_owner->nfrag_done ++; + } + if (req_done(c_conn, TAILQ_FIRST(&c_conn->omsg_q))) { event_add_out(ctx->evb, msg->owner); } @@ -397,6 +401,9 @@ server_close(struct context *ctx, struct conn *conn) msg->done = 1; msg->error = 1; msg->err = conn->err; + if (msg->frag_owner != NULL){ + msg->frag_owner->nfrag_done ++; + } if (req_done(c_conn, TAILQ_FIRST(&c_conn->omsg_q))) { event_add_out(ctx->evb, msg->owner); @@ -608,10 +615,9 @@ server_pool_hash(struct server_pool *pool, uint8_t *key, uint32_t keylen) return pool->key_hash((char *)key, keylen); } -static struct server * -server_pool_server(struct server_pool *pool, uint8_t *key, uint32_t keylen) +uint32_t +server_pool_idx(struct server_pool *pool, uint8_t *key, uint32_t keylen) { - struct server *server; uint32_t hash, idx; ASSERT(array_n(&pool->server) != 0); @@ -634,10 +640,19 @@ server_pool_server(struct server_pool *pool, uint8_t *key, uint32_t keylen) default: NOT_REACHED(); - return NULL; + return 0; } ASSERT(idx < array_n(&pool->server)); + return idx; +} + +static struct server * +server_pool_server(struct server_pool *pool, uint8_t *key, uint32_t keylen) +{ + struct server *server; + uint32_t idx; + idx = server_pool_idx(pool, key, keylen); server = array_get(&pool->server, idx); log_debug(LOG_VERB, "key '%.*s' on dist %d maps to server '%.*s'", keylen, diff --git a/src/nc_server.h b/src/nc_server.h index dfe16faf..9a254409 100644 --- a/src/nc_server.h +++ b/src/nc_server.h @@ -133,6 +133,8 @@ void server_close(struct context *ctx, struct conn *conn); void server_connected(struct context *ctx, struct conn *conn); void server_ok(struct context *ctx, struct conn *conn); + +uint32_t server_pool_idx(struct server_pool *pool, uint8_t *key, uint32_t keylen); struct conn *server_pool_conn(struct context *ctx, struct server_pool *pool, uint8_t *key, uint32_t keylen); rstatus_t server_pool_run(struct server_pool *pool); rstatus_t server_pool_preconnect(struct context *ctx); diff --git a/src/nc_string.h b/src/nc_string.h index 8db44689..16eb5707 100644 --- a/src/nc_string.h +++ b/src/nc_string.h @@ -19,6 +19,7 @@ #define _NC_STRING_H_ #include +#include #include struct string { diff --git a/src/proto/nc_memcache.c b/src/proto/nc_memcache.c index 1664640b..97869271 100644 --- a/src/proto/nc_memcache.c +++ b/src/proto/nc_memcache.c @@ -73,7 +73,7 @@ memcache_cas(struct msg *r) * Return true, if the memcache command is a retrieval command, otherwise * return false */ -static bool +bool memcache_retrieval(struct msg *r) { switch (r->type) { @@ -192,6 +192,7 @@ memcache_parse_req(struct msg *r) m = r->token; r->token = NULL; r->type = MSG_UNKNOWN; + r->narg ++; switch (p - m) { @@ -307,14 +308,17 @@ memcache_parse_req(struct msg *r) case SW_SPACES_BEFORE_KEY: if (ch != ' ') { - r->token = p; - r->key_start = p; + p = p - 1; /* go back by 1 byte */ + r->token = NULL; state = SW_KEY; } - break; case SW_KEY: + if (r->token == NULL) { + r->token = p; + r->key_start = p; + } if (ch == ' ' || ch == CR) { if ((p - r->key_start) > MEMCACHE_MAX_KEY_LENGTH) { log_error("parsed bad req %"PRIu64" of type %d with key " @@ -323,6 +327,7 @@ memcache_parse_req(struct msg *r) r->key_start, p - r->key_start); goto error; } + r->narg ++; r->key_end = p; r->token = NULL; @@ -360,8 +365,10 @@ memcache_parse_req(struct msg *r) break; default: - r->token = p; - goto fragment; + r->token = NULL; + p = p - 1; /* go back by 1 byte */ + state = SW_KEY; + /*goto fragment;*/ } break; @@ -424,7 +431,6 @@ memcache_parse_req(struct msg *r) goto error; } /* vlen_start <- p */ - r->token = p; r->vlen = (uint32_t)(ch - '0'); state = SW_VLEN; } @@ -724,17 +730,17 @@ memcache_parse_rsp(struct msg *r) SW_RSP_STR, SW_SPACES_BEFORE_KEY, SW_KEY, - SW_SPACES_BEFORE_FLAGS, + SW_SPACES_BEFORE_FLAGS, /*5*/ SW_FLAGS, SW_SPACES_BEFORE_VLEN, SW_VLEN, SW_RUNTO_VAL, - SW_VAL, + SW_VAL, /*10*/ SW_VAL_LF, SW_END, SW_RUNTO_CRLF, SW_CRLF, - SW_ALMOST_DONE, + SW_ALMOST_DONE, /*15*/ SW_SENTINEL } state; @@ -795,7 +801,7 @@ memcache_parse_rsp(struct msg *r) if (ch == ' ' || ch == CR) { /* type_end <- p - 1 */ m = r->token; - r->token = NULL; + /*r->token = NULL;*/ r->type = MSG_UNKNOWN; switch (p - m) { @@ -924,21 +930,9 @@ memcache_parse_rsp(struct msg *r) break; case SW_KEY: - if (r->token == NULL) { - r->token = p; - r->key_start = p; - } - if (ch == ' ') { - if ((p - r->key_start) > MEMCACHE_MAX_KEY_LENGTH) { - log_error("parsed bad req %"PRIu64" of type %d with key " - "prefix '%.*s...' and length %d that exceeds " - "maximum key length", r->id, r->type, 16, - r->key_start, p - r->key_start); - goto error; - } r->key_end = p; - r->token = NULL; + /*r->token = NULL;*/ state = SW_SPACES_BEFORE_FLAGS; } @@ -958,7 +952,7 @@ memcache_parse_rsp(struct msg *r) case SW_FLAGS: if (r->token == NULL) { /* flags_start <- p */ - r->token = p; + /*r->token = p;*/ } if (isdigit(ch)) { @@ -966,7 +960,7 @@ memcache_parse_rsp(struct msg *r) ; } else if (ch == ' ') { /* flags_end <- p - 1 */ - r->token = NULL; + /*r->token = NULL;*/ state = SW_SPACES_BEFORE_VLEN; } else { goto error; @@ -981,21 +975,18 @@ memcache_parse_rsp(struct msg *r) } p = p - 1; /* go back by 1 byte */ state = SW_VLEN; + r->vlen = 0; } break; case SW_VLEN: - if (r->token == NULL) { - /* vlen_start <- p */ - r->token = p; - r->vlen = (uint32_t)(ch - '0'); - } else if (isdigit(ch)) { + if (isdigit(ch)) { r->vlen = r->vlen * 10 + (uint32_t)(ch - '0'); } else if (ch == ' ' || ch == CR) { /* vlen_end <- p - 1 */ p = p - 1; /* go back by 1 byte */ - r->token = NULL; + /*r->token = NULL;*/ state = SW_RUNTO_CRLF; } else { goto error; @@ -1008,6 +999,7 @@ memcache_parse_rsp(struct msg *r) case LF: /* val_start <- p + 1 */ state = SW_VAL; + r->token = NULL; break; default: @@ -1041,7 +1033,8 @@ memcache_parse_rsp(struct msg *r) case SW_VAL_LF: switch (ch) { case LF: - state = SW_END; + /*state = SW_END;*/ + state = SW_RSP_STR; break; default: @@ -1134,6 +1127,9 @@ memcache_parse_rsp(struct msg *r) r->state = state; if (b->last == b->end && r->token != NULL) { + if (state <= SW_RUNTO_VAL || state == SW_CRLF || state == SW_ALMOST_DONE){ + r->state = SW_START; + } r->pos = r->token; r->token = NULL; r->result = MSG_PARSE_REPAIR; @@ -1237,6 +1233,7 @@ memcache_pre_coalesce(struct msg *r) return; } + pr->frag_owner->nfrag_done ++; switch (r->type) { case MSG_RSP_MC_VALUE: @@ -1294,6 +1291,76 @@ memcache_pre_coalesce(struct msg *r) } } + +/* + * copy one response from src to dst + * return bytes copied + * */ +uint32_t +memcache_copy_bulk(struct msg *dst, struct msg * src){ + struct mbuf *buf, *nbuf; + uint8_t * p; + uint32_t len = 0; + uint32_t bytes = 0; + uint32_t i = 0; + + for(buf = STAILQ_FIRST(&src->mhdr); buf && (buf->pos >= buf->last); buf = STAILQ_FIRST(&src->mhdr)){ + mbuf_remove(&src->mhdr, buf); + mbuf_put(buf); + } + + buf = STAILQ_FIRST(&src->mhdr); + if (buf == NULL){ + return 0; + } + p = buf->pos; + + /*get : VALUE key 0 len\r\nv\r\n */ + /*gets: VALUE key 0 len cas\r\rnv\r\n */ + + ASSERT(*p == 'V'); + for(i = 0; i < 3; i++){ /* eat 'VALUE key 0 ' */ + for(;*p != ' ';){ + p++; + } + p++; + } + + len = 0; + for (; p < buf->last && isdigit(*p); p++) { + len = len * 10 + (uint32_t)(*p - '0'); + } + + for (; p < buf->last && ('\r' != *p); p++) { /*eat cas for gets*/ + ; + } + + len += CRLF_LEN * 2; + len += (p - buf->pos); + + bytes = len; + + /*copy len bytes to dst*/ + for(; buf ;){ + if(mbuf_length(buf) <= len){ /*steal this buf from src to dst*/ + nbuf = STAILQ_NEXT(buf, next); + mbuf_remove(&src->mhdr, buf); + mbuf_insert(&dst->mhdr, buf); + len -= mbuf_length(buf); + buf = nbuf; + }else{ /*split it*/ + nbuf = mbuf_get(); + if (nbuf == NULL) { + return -1; + } + mbuf_copy(nbuf, buf->pos, len); + mbuf_insert(&dst->mhdr, nbuf); + buf->pos += len; + break; + } + } + return bytes; +} /* * Post-coalesce handler is invoked when the message is a response to * the fragmented multi vector request - 'get' or 'gets' and all the @@ -1301,6 +1368,45 @@ memcache_pre_coalesce(struct msg *r) * the fragmented request is consider to be done */ void -memcache_post_coalesce(struct msg *r) +memcache_post_coalesce(struct msg *request) { + struct msg *response = request->peer; + struct mbuf * mbuf; + uint32_t len; + struct msg *sub_msg; + uint32_t i; + + ASSERT(!response->request); + ASSERT(request->request && request->first_fragment); + if (request->error || request->ferror) { + /* do nothing, if msg is in error */ + return; + } + + msg_reset_mbufs(response); + + for(i = 1; i < request->narg; i++){ /*for each key*/ + sub_msg = request->frag_seq[i]->peer; /*get it's peer response*/ + if(sub_msg == NULL){ + continue; /*no response because of error, we do nothing and leave it to the req_error() check in rsp_send_next*/ + } + len = memcache_copy_bulk(response, sub_msg); + ASSERT(len>=0); + log_debug(LOG_VVERB, "memcache_copy_bulk for mget copy bytes: %d", len); + response->mlen += len; + sub_msg->mlen -= len; + } + + + /*append END\r\n*/ + mbuf = mbuf_get(); + if (mbuf == NULL) { + nc_free(request->frag_seq); + return; + } + mbuf->last += nc_snprintf(mbuf->last, mbuf_size(mbuf), "END\r\n"); + response->mlen += mbuf_length(mbuf); + STAILQ_INSERT_TAIL(&response->mhdr, mbuf, next); + + nc_free(request->frag_seq); } diff --git a/src/proto/nc_proto.h b/src/proto/nc_proto.h index e6eb9800..e1e6c2f1 100644 --- a/src/proto/nc_proto.h +++ b/src/proto/nc_proto.h @@ -145,6 +145,12 @@ rstatus_t memcache_post_splitcopy(struct msg *r); void memcache_pre_coalesce(struct msg *r); void memcache_post_coalesce(struct msg *r); +uint32_t redis_copy_bulk(struct msg *dst, struct msg * src); + +bool redis_argx(struct msg *r); +bool redis_arg2x(struct msg *r); +bool memcache_retrieval(struct msg *r); + void redis_parse_req(struct msg *r); void redis_parse_rsp(struct msg *r); void redis_pre_splitcopy(struct mbuf *mbuf, void *arg); diff --git a/src/proto/nc_redis.c b/src/proto/nc_redis.c index 0367126a..08e97a44 100644 --- a/src/proto/nc_redis.c +++ b/src/proto/nc_redis.c @@ -215,7 +215,7 @@ redis_argn(struct msg *r) * Return true, if the redis command is a vector command accepting one or * more keys, otherwise return false */ -static bool +bool redis_argx(struct msg *r) { switch (r->type) { @@ -230,6 +230,24 @@ redis_argx(struct msg *r) return false; } +/* + * Return true, if the redis command is a vector command accepting one or + * more key-value pairs, otherwise return false + */ +bool +redis_arg2x(struct msg *r) +{ + switch (r->type) { + case MSG_REQ_REDIS_MSET: + return true; + + default: + break; + } + + return false; +} + /* * Return true, if the redis command is either EVAL or EVALSHA. These commands * have a special format with exactly 2 arguments, followed by one or more keys, @@ -540,6 +558,10 @@ redis_parse_req(struct msg *r) r->type = MSG_REQ_REDIS_MGET; break; } + if (str4icmp(m, 'm', 's', 'e', 't')) { + r->type = MSG_REQ_REDIS_MSET; + break; + } if (str4icmp(m, 'z', 'a', 'd', 'd')) { r->type = MSG_REQ_REDIS_ZADD; @@ -1051,7 +1073,13 @@ redis_parse_req(struct msg *r) if (r->rnarg == 0) { goto done; } - state = SW_FRAGMENT; + state = SW_KEY_LEN; + /*state = SW_FRAGMENT;*/ + } else if (redis_arg2x(r)) { + if (r->rnarg == 0) { + goto done; + } + state = SW_ARG1_LEN; } else if (redis_argeval(r)) { if (r->rnarg == 0) { goto done; @@ -1155,6 +1183,11 @@ redis_parse_req(struct msg *r) goto error; } state = SW_ARG2_LEN; + } else if (redis_arg2x(r)) { + if (r->rnarg == 0) { + goto done; + } + state = SW_KEY_LEN; } else { goto error; } @@ -1923,7 +1956,7 @@ void redis_pre_splitcopy(struct mbuf *mbuf, void *arg) { struct msg *r = arg; - int n; + uint32_t n; ASSERT(r->request); ASSERT(r->narg > 1); @@ -2008,6 +2041,7 @@ redis_pre_coalesce(struct msg *r) /* do nothing, if not a response to a fragmented request */ return; } + pr->frag_owner->nfrag_done ++; switch (r->type) { case MSG_RSP_REDIS_INTEGER: @@ -2060,6 +2094,14 @@ redis_pre_coalesce(struct msg *r) } break; + case MSG_RSP_REDIS_STATUS: + if (pr->type == MSG_REQ_REDIS_MSET){ /*MSET segments*/ + mbuf = STAILQ_FIRST(&r->mhdr); + r->mlen -= mbuf_length(mbuf); + mbuf_rewind(mbuf); + } + break; + default: /* * Valid responses for a fragmented request are MSG_RSP_REDIS_INTEGER or, @@ -2075,6 +2117,126 @@ redis_pre_coalesce(struct msg *r) } } +/* + * copy one response from src to dst + * return bytes copied + * */ +uint32_t +redis_copy_bulk(struct msg *dst, struct msg * src){ + struct mbuf *buf, *nbuf; + uint8_t * p; + uint32_t len = 0; + uint32_t bytes = 0; + + for(buf = STAILQ_FIRST(&src->mhdr); buf && (buf->pos >= buf->last); buf = STAILQ_FIRST(&src->mhdr)){ + mbuf_remove(&src->mhdr, buf); + mbuf_put(buf); + } + + buf = STAILQ_FIRST(&src->mhdr); + if (buf == NULL){ + return 0; + } + + p = buf->pos; + ASSERT(*p == '$'); + p++; + + if (p[0] == '-' && p[1] == '1'){ + len = 1 + 2 + CRLF_LEN; + p = buf->pos + len; + }else{ + len = 0; + for (; p < buf->last && isdigit(*p); p++) { + len = len * 10 + (uint32_t)(*p - '0'); + } + len += CRLF_LEN * 2; + len += (p - buf->pos); + } + bytes = len; + + /*copy len bytes to dst*/ + for(; buf ;){ + if(mbuf_length(buf) <= len){ /*steal this buf from src to dst*/ + nbuf = STAILQ_NEXT(buf, next); + mbuf_remove(&src->mhdr, buf); + mbuf_insert(&dst->mhdr, buf); + len -= mbuf_length(buf); + buf = nbuf; + }else{ /*split it*/ + nbuf = mbuf_get(); + if (nbuf == NULL) { + return -1; + } + mbuf_copy(nbuf, buf->pos, len); + mbuf_insert(&dst->mhdr, nbuf); + buf->pos += len; + break; + } + } + return bytes; +} + +void +redis_post_coalesce_mset(struct msg *request) { + struct msg *response = request->peer; + struct mbuf * mbuf; + uint32_t len; + + mbuf = STAILQ_FIRST(&response->mhdr); + + len = nc_scnprintf(mbuf->last, mbuf_size(mbuf), "+OK\r\n"); + mbuf->last += len; + response->mlen += (uint32_t)len; + + nc_free(request->frag_seq); +} + +void +redis_post_coalesce_del(struct msg *request) { + struct msg *response = request->peer; + struct mbuf * mbuf; + uint32_t len; + + mbuf = STAILQ_FIRST(&response->mhdr); + + len = nc_scnprintf(mbuf->last, mbuf_size(mbuf), ":%d\r\n", request->integer); + mbuf->last += len; + response->mlen += (uint32_t)len; + + nc_free(request->frag_seq); + +} + +static void +redis_post_coalesce_mget(struct msg *request) { + struct msg *response = request->peer; + struct msg *sub_msg; + struct mbuf * mbuf; + uint32_t len; + uint32_t i; + + mbuf = STAILQ_FIRST(&response->mhdr); + + len = nc_scnprintf(mbuf->last, mbuf_size(mbuf), "*%d\r\n", request->narg - 1); + mbuf->last += len; + response->mlen = (uint32_t)len; + + for(i = 1; i < request->narg; i++){ /*for each key*/ + sub_msg = request->frag_seq[i]->peer; /*get it's peer response*/ + if(sub_msg == NULL){ + continue; /*no response because of error, we do nothing and leave it to the req_error() check in rsp_send_next*/ + } + len = redis_copy_bulk(response, sub_msg); + ASSERT(len>=0); + log_debug(LOG_VVERB, "redis_copy_bulk for mget copy bytes: %d", len); + response->mlen += len; + sub_msg->mlen -= len; + } + + nc_free(request->frag_seq); +} + /* * Post-coalesce handler is invoked when the message is a response to * the fragmented multi vector request - 'mget' or 'del' and all the @@ -2086,44 +2248,20 @@ redis_post_coalesce(struct msg *r) { struct msg *pr = r->peer; /* peer response */ struct mbuf *mbuf; - int n; + uint32_t n; + ASSERT(!pr->request); ASSERT(r->request && r->first_fragment); if (r->error || r->ferror) { /* do nothing, if msg is in error */ return; } - ASSERT(!pr->request); - - switch (pr->type) { - case MSG_RSP_REDIS_INTEGER: - /* only redis 'del' fragmented request sends back integer reply */ - ASSERT(r->type == MSG_REQ_REDIS_DEL); - - mbuf = STAILQ_FIRST(&pr->mhdr); - - ASSERT(pr->mlen == 0); - ASSERT(mbuf_empty(mbuf)); - - n = nc_scnprintf(mbuf->last, mbuf_size(mbuf), ":%d\r\n", r->integer); - mbuf->last += n; - pr->mlen += (uint32_t)n; - break; - - case MSG_RSP_REDIS_MULTIBULK: - /* only redis 'mget' fragmented request sends back multi-bulk reply */ - ASSERT(r->type == MSG_REQ_REDIS_MGET); - - mbuf = STAILQ_FIRST(&pr->mhdr); - ASSERT(mbuf_empty(mbuf)); - - n = nc_scnprintf(mbuf->last, mbuf_size(mbuf), "*%d\r\n", r->nfrag); - mbuf->last += n; - pr->mlen += (uint32_t)n; - break; - - default: - NOT_REACHED(); + if (r->type == MSG_REQ_REDIS_MGET) { + redis_post_coalesce_mget(r); + } else if (r->type == MSG_REQ_REDIS_DEL) { + redis_post_coalesce_del(r); + } else if (r->type == MSG_REQ_REDIS_MSET) { + redis_post_coalesce_mset(r); } } diff --git a/travis.sh b/travis.sh new file mode 100644 index 00000000..a4011f78 --- /dev/null +++ b/travis.sh @@ -0,0 +1,25 @@ +#!/bin/bash +#file : travis.sh +#author : ning +#date : 2014-05-10 16:54:43 + +#install +sudo pip install redis +sudo pip install nose +sudo pip install -e git://github.com/idning/python-memcached.git#egg=memcache +sudo apt-get install socat + +#build twemproxy +CFLAGS="-ggdb3 -O0" autoreconf -fvi && ./configure --enable-debug=log && make + +#setup test-twemproxy +git clone https://github.com/idning/test-twemproxy.git + +cp src/nutcracker test-twemproxy/_binaries/ +cp `which redis-server` test-twemproxy/_binaries/ +cp `which redis-cli` test-twemproxy/_binaries/ +cp `which memcached` test-twemproxy/_binaries/ + +#run test +cd test-twemproxy/ && nosetests --nocapture --nologcapture -v +