From 7cf5625334bee6a5c49a621966acd620f02c3340 Mon Sep 17 00:00:00 2001 From: idning Date: Fri, 28 Mar 2014 14:08:55 +0800 Subject: [PATCH 01/13] add frag_owner counter --- src/nc_message.c | 1 + src/nc_message.h | 1 + src/nc_request.c | 6 ++++++ src/proto/nc_redis.c | 1 + 4 files changed, 9 insertions(+) diff --git a/src/nc_message.c b/src/nc_message.c index 654cdf9a..f2f7fd7f 100644 --- a/src/nc_message.c +++ b/src/nc_message.c @@ -235,6 +235,7 @@ _msg_get(void) msg->frag_owner = NULL; msg->nfrag = 0; + msg->nfrag_done = 0; msg->frag_id = 0; msg->narg_start = NULL; diff --git a/src/nc_message.h b/src/nc_message.h index b08221f8..fb616f02 100644 --- a/src/nc_message.h +++ b/src/nc_message.h @@ -198,6 +198,7 @@ struct msg { struct msg *frag_owner; /* owner of fragment message */ uint32_t nfrag; /* # fragment */ + uint32_t nfrag_done; /* # fragment */ uint64_t frag_id; /* id of fragmented message */ err_t err; /* errno on error? */ diff --git a/src/nc_request.c b/src/nc_request.c index d89f99e1..5e759ef2 100644 --- a/src/nc_request.c +++ b/src/nc_request.c @@ -84,6 +84,12 @@ req_done(struct conn *conn, struct msg *msg) return true; } + /*loga("ning req_done(): msg->nfrag: %d, msg->nfrag_done:%d", msg->nfrag, msg->nfrag_done);*/ + /*msg_dump(msg);*/ + if(msg->nfrag_done < msg->nfrag){ + return false; + } + /* check all fragments of the given request vector are done */ for (pmsg = msg, cmsg = TAILQ_PREV(msg, msg_tqh, c_tqe); diff --git a/src/proto/nc_redis.c b/src/proto/nc_redis.c index 0367126a..96854c05 100644 --- a/src/proto/nc_redis.c +++ b/src/proto/nc_redis.c @@ -2008,6 +2008,7 @@ redis_pre_coalesce(struct msg *r) /* do nothing, if not a response to a fragmented request */ return; } + pr->frag_owner->nfrag_done ++; switch (r->type) { case MSG_RSP_REDIS_INTEGER: From c9a36aada7813855d7ad7e4d3a38f09985a2779a Mon Sep 17 00:00:00 2001 From: idning Date: Sat, 29 Mar 2014 17:19:01 +0800 Subject: [PATCH 02/13] After rebase: mget split to many mget and send to backend reabse following:: * 0fd32bb - (HEAD, idning/mget-improve, mget-improve) unchange (10 hours ago) * 705acab - clean code (10 hours ago) * 32e8559 - correct, but not clean. (18 hours ago) * fde1f12 - mget-improve almost done. (2 days ago) * b30fa4f - mget split to many mget and send to backend (3 days ago) * 7cf5625 - add frag_owner counter (4 days ago) * 8a4f5c0 - (origin/master, origin/HEAD, idning/master) Update README.md (12 days ago) --- src/nc_log.c | 7 ++ src/nc_message.c | 201 ++++++++++++++++++++++++++++++++++++++++++- src/nc_message.h | 5 +- src/nc_request.c | 13 ++- src/nc_server.c | 16 +++- src/nc_server.h | 2 + src/proto/nc_redis.c | 92 +++++++++++++++++++- 7 files changed, 319 insertions(+), 17 deletions(-) diff --git a/src/nc_log.c b/src/nc_log.c index fa0ae543..4111c9fa 100644 --- a/src/nc_log.c +++ b/src/nc_log.c @@ -250,5 +250,12 @@ _log_hexdump(const char *file, int line, char *data, int datalen, l->nerror++; } + if (len >= size - 1){ + n = nc_write(l->fd, "\n", 1); + if (n < 0) { + l->nerror++; + } + } + errno = errno_save; } diff --git a/src/nc_message.c b/src/nc_message.c index f2f7fd7f..626be9d4 100644 --- a/src/nc_message.c +++ b/src/nc_message.c @@ -17,6 +17,7 @@ #include #include +#include #include @@ -234,6 +235,7 @@ _msg_get(void) msg->end = NULL; msg->frag_owner = NULL; + msg->frag_seq = NULL; msg->nfrag = 0; msg->nfrag_done = 0; msg->frag_id = 0; @@ -362,10 +364,14 @@ msg_put(struct msg *msg) } void -msg_dump(struct msg *msg) +msg_dump(struct msg *msg, int level) { struct mbuf *mbuf; + if (log_loggable(level) == 0) { + return; + } + loga("msg dump id %"PRIu64" request %d len %"PRIu32" type %d done %d " "error %d (err %d)", msg->id, msg->request, msg->mlen, msg->type, msg->done, msg->error, msg->err); @@ -378,7 +384,16 @@ msg_dump(struct msg *msg) q = mbuf->last; len = q - p; - loga_hexdump(p, len, "mbuf with %ld bytes of data", len); + loga_hexdump(p, len, "mbuf [%p] with %ld bytes of data", p, len); + } +} + +void +msg_reset_mbufs(struct msg *msg) +{ + struct mbuf *mbuf; + STAILQ_FOREACH(mbuf, &msg->mhdr, next) { + mbuf->pos = mbuf->last = mbuf->start; } } @@ -413,12 +428,192 @@ msg_empty(struct msg *msg) return msg->mlen == 0 ? true : false; } +static uint32_t +key_to_idx(struct server_pool *pool, uint8_t *key, uint32_t keylen){ + //hash_tag + if (!string_empty(&pool->hash_tag)) { + struct string *tag = &pool->hash_tag; + uint8_t *tag_start, *tag_end; + + tag_start = nc_strchr(key, key + keylen, tag->data[0]); + if (tag_start != NULL) { + tag_end = nc_strchr(tag_start + 1, key + keylen, tag->data[1]); + if (tag_end != NULL) { + key = tag_start + 1; + keylen = (uint32_t)(tag_end - key); + } + } + } + return server_pool_idx(pool, key, keylen); +} + +/* + * parse next key in mget request, update + * r->key_start + * r->key_end + * */ +static rstatus_t +msg_fragment_mget_update_keypos(struct msg *r, struct mbuf ** in_buf){ + struct mbuf *buf; + uint8_t * p; + uint32_t len = 0; + uint32_t keylen = 0; + + for(buf = STAILQ_FIRST(&r->mhdr); buf->pos >= buf->last; buf = STAILQ_FIRST(&r->mhdr)){ + mbuf_remove(&r->mhdr, buf); + mbuf_put(buf); + } + + p = buf->pos; + ASSERT(*p == '$'); + p ++; + + len = 0; + for (; p < buf->last && isdigit(*p); p++) { + len = len * 10 + (uint32_t)(*p - '0'); + } + keylen = len; + len += (uint32_t)CRLF_LEN * 2; + len += (uint32_t)(p - buf->pos); + + if(mbuf_length(buf) < len){ //key no in this buf, remove it. + len -= mbuf_length(buf); + mbuf_remove(&r->mhdr, buf); + mbuf_put(buf); + + buf = STAILQ_FIRST(&r->mhdr); + } + + r->key_end = buf->pos + len - CRLF_LEN; + r->key_start = r->key_end - keylen; //TODO, check + buf->pos += len; + return NC_OK; +} + +static rstatus_t +msg_fragment_mget(struct context *ctx, struct conn *conn, struct msg *msg){ + struct server_pool *pool; + struct mbuf *mbuf, *sub_msg_mbuf; + struct msg ** sub_msgs; + uint32_t i; + + ASSERT(conn->client && !conn->proxy); + ASSERT(conn->owner != NULL); + + //init sub_msgs and msg->frag_seq + pool = conn->owner; + sub_msgs = nc_alloc(pool->ncontinuum * sizeof(void *) ); + for(i = 0; i < pool->ncontinuum; i++){ + sub_msgs[i] = msg_get(msg->owner, msg->request, conn->redis); + } + msg->frag_seq = nc_alloc(sizeof(struct msg *) * msg->narg); //the point for each key, point to sub_msgs elements + + mbuf = STAILQ_FIRST(&msg->mhdr); + mbuf->pos = mbuf->start; + + for(i = 0; i< 3; i++){ //eat *narg\r\n$4\r\nMGET\r\n + for(;*(mbuf->pos) != '\n';){ + mbuf->pos ++; + } + mbuf->pos ++; + } + + msg->frag_id = ++frag_id; + msg->first_fragment = 1; + msg->nfrag = 0; + msg->frag_owner = msg; + + for(i = 1; i < msg->narg; i++){ //for each key + msg_fragment_mget_update_keypos(msg, &mbuf); + + uint8_t * key = msg->key_start; + uint32_t keylen = (uint32_t)(msg->key_end - msg->key_start); + uint32_t idx = key_to_idx(pool, key, keylen); + struct msg *sub_msg = sub_msgs[idx]; + + msg->frag_seq[i] = sub_msgs[idx]; + + sub_msg->narg ++; + + if (STAILQ_EMPTY(&sub_msg->mhdr) + || mbuf_size(STAILQ_LAST(&sub_msg->mhdr, mbuf, next)) < keylen + CRLF_LEN + CRLF_LEN + 6 ){ //6 is $ len (key <= 64k, so len less then 5) + sub_msg_mbuf = mbuf_get(); + if (sub_msg_mbuf == NULL) { + nc_free(sub_msgs); + return NC_ENOMEM; + } + mbuf_insert(&sub_msg->mhdr, sub_msg_mbuf); + }else{ + sub_msg_mbuf = STAILQ_LAST(&sub_msg->mhdr, mbuf, next); + } + + uint8_t * p = sub_msg_mbuf ->last; + + sub_msg_mbuf->last += nc_snprintf(sub_msg_mbuf->last, mbuf_size(sub_msg_mbuf), "$%d\r\n", keylen); + sub_msg->key_start = sub_msg_mbuf->last; //update key_start + mbuf_copy(sub_msg_mbuf, key, keylen); + sub_msg->key_end = sub_msg_mbuf->last; //update key_start + sub_msg_mbuf->last += nc_snprintf(sub_msg_mbuf->last, mbuf_size(sub_msg_mbuf), "\r\n"); + + sub_msg->mlen += (uint32_t)(sub_msg_mbuf->last - p); + } + + msg_reset_mbufs(msg); + //rewrite the orig msg to a PING cmd + mbuf = STAILQ_FIRST(&msg->mhdr); + mbuf->pos = mbuf->last = mbuf->start; + + nc_memcpy(mbuf->pos, "*1\r\n$4\r\nPING\r\n", 14); + mbuf->last = mbuf->start + 14; + msg->key_start = mbuf->start + 8; + msg->key_end = mbuf->start + 12; + msg->mlen = 14; + + conn->recv_done(ctx, conn, msg, NULL); + + for(i = 0; i < pool->ncontinuum; i++){ //prepend mget header, and forward it + struct msg* sub_msg = sub_msgs[i]; + if(STAILQ_EMPTY(&sub_msg->mhdr)){ + msg_put(sub_msg); + continue; + } + + sub_msg_mbuf = mbuf_get(); + if (sub_msg_mbuf == NULL) { + nc_free(sub_msgs); + return NC_ENOMEM; + } + sub_msg_mbuf->last += nc_snprintf(sub_msg_mbuf->last, mbuf_size(sub_msg_mbuf), "*%d\r\n$4\r\nmget\r\n", + sub_msg->narg + 1); + + STAILQ_INSERT_HEAD(&sub_msg->mhdr, sub_msg_mbuf, next); + sub_msg->type = MSG_REQ_REDIS_MGET; + sub_msg->frag_id = msg->frag_id; + sub_msg->frag_owner = msg->frag_owner; + + conn->recv_done(ctx, conn, sub_msg, NULL); + + msg->nfrag ++; + } + + nc_free(sub_msgs); + return NC_OK; +} + static rstatus_t msg_parsed(struct context *ctx, struct conn *conn, struct msg *msg) { - struct msg *nmsg; + struct msg *nmsg; //next msg struct mbuf *mbuf, *nbuf; + if(msg->type == MSG_REQ_REDIS_MGET){ + rstatus_t status = msg_fragment_mget(ctx, conn, msg); + if (status != NC_OK) { + return status; + } + return NC_OK;//TODO. + } + mbuf = STAILQ_LAST(&msg->mhdr, mbuf, next); if (msg->pos == mbuf->last) { /* no more data to parse */ diff --git a/src/nc_message.h b/src/nc_message.h index fb616f02..b94758cd 100644 --- a/src/nc_message.h +++ b/src/nc_message.h @@ -200,6 +200,7 @@ struct msg { uint32_t nfrag; /* # fragment */ uint32_t nfrag_done; /* # fragment */ uint64_t frag_id; /* id of fragmented message */ + struct msg **frag_seq; /* sequence of fragment message, one element for each mget key*/ err_t err; /* errno on error? */ unsigned error:1; /* error? */ @@ -226,7 +227,7 @@ void msg_deinit(void); struct msg *msg_get(struct conn *conn, bool request, bool redis); void msg_put(struct msg *msg); struct msg *msg_get_error(bool redis, err_t err); -void msg_dump(struct msg *msg); +void msg_dump(struct msg *msg, int level); bool msg_empty(struct msg *msg); rstatus_t msg_recv(struct context *ctx, struct conn *conn); rstatus_t msg_send(struct context *ctx, struct conn *conn); @@ -234,6 +235,8 @@ rstatus_t msg_send(struct context *ctx, struct conn *conn); struct msg *req_get(struct conn *conn); void req_put(struct msg *msg); bool req_done(struct conn *conn, struct msg *msg); +void msg_reset_mbufs(struct msg *msg); + bool req_error(struct conn *conn, struct msg *msg); void req_server_enqueue_imsgq(struct context *ctx, struct conn *conn, struct msg *msg); void req_server_dequeue_imsgq(struct context *ctx, struct conn *conn, struct msg *msg); diff --git a/src/nc_request.c b/src/nc_request.c index 5e759ef2..9a11a158 100644 --- a/src/nc_request.c +++ b/src/nc_request.c @@ -84,8 +84,6 @@ req_done(struct conn *conn, struct msg *msg) return true; } - /*loga("ning req_done(): msg->nfrag: %d, msg->nfrag_done:%d", msg->nfrag, msg->nfrag_done);*/ - /*msg_dump(msg);*/ if(msg->nfrag_done < msg->nfrag){ return false; } @@ -110,9 +108,9 @@ req_done(struct conn *conn, struct msg *msg) } } - if (!pmsg->last_fragment) { - return false; - } + /*if (!pmsg->last_fragment) {*/ + /*return false;*/ + /*}*/ /* * At this point, all the fragments including the last fragment have @@ -139,7 +137,7 @@ req_done(struct conn *conn, struct msg *msg) nfragment++; } - ASSERT(msg->frag_owner->nfrag == nfragment); + /*ASSERT(msg->frag_owner->nfrag == nfragment);*/ msg->post_coalesce(msg->frag_owner); @@ -149,6 +147,7 @@ req_done(struct conn *conn, struct msg *msg) return true; } + /* * Return true if request is in error, false otherwise * @@ -510,7 +509,7 @@ req_recv_done(struct context *ctx, struct conn *conn, struct msg *msg, ASSERT(conn->client && !conn->proxy); ASSERT(msg->request); ASSERT(msg->owner == conn); - ASSERT(conn->rmsg == msg); + /*ASSERT(conn->rmsg == msg);*/ ASSERT(nmsg == NULL || nmsg->request); /* enqueue next message (request), if any */ diff --git a/src/nc_server.c b/src/nc_server.c index 40705167..5e3ef28a 100644 --- a/src/nc_server.c +++ b/src/nc_server.c @@ -608,8 +608,8 @@ server_pool_hash(struct server_pool *pool, uint8_t *key, uint32_t keylen) return pool->key_hash((char *)key, keylen); } -static struct server * -server_pool_server(struct server_pool *pool, uint8_t *key, uint32_t keylen) +uint32_t +server_pool_idx(struct server_pool *pool, uint8_t *key, uint32_t keylen) { struct server *server; uint32_t hash, idx; @@ -634,10 +634,20 @@ server_pool_server(struct server_pool *pool, uint8_t *key, uint32_t keylen) default: NOT_REACHED(); - return NULL; + return -1; } ASSERT(idx < array_n(&pool->server)); + return idx; +} + +static struct server * +server_pool_server(struct server_pool *pool, uint8_t *key, uint32_t keylen) +{ + struct server *server; + uint32_t idx; + idx = server_pool_idx(pool, key, keylen); + ASSERT(idx >= 0); server = array_get(&pool->server, idx); log_debug(LOG_VERB, "key '%.*s' on dist %d maps to server '%.*s'", keylen, diff --git a/src/nc_server.h b/src/nc_server.h index dfe16faf..9a254409 100644 --- a/src/nc_server.h +++ b/src/nc_server.h @@ -133,6 +133,8 @@ void server_close(struct context *ctx, struct conn *conn); void server_connected(struct context *ctx, struct conn *conn); void server_ok(struct context *ctx, struct conn *conn); + +uint32_t server_pool_idx(struct server_pool *pool, uint8_t *key, uint32_t keylen); struct conn *server_pool_conn(struct context *ctx, struct server_pool *pool, uint8_t *key, uint32_t keylen); rstatus_t server_pool_run(struct server_pool *pool); rstatus_t server_pool_preconnect(struct context *ctx); diff --git a/src/proto/nc_redis.c b/src/proto/nc_redis.c index 96854c05..78be9f53 100644 --- a/src/proto/nc_redis.c +++ b/src/proto/nc_redis.c @@ -1051,7 +1051,8 @@ redis_parse_req(struct msg *r) if (r->rnarg == 0) { goto done; } - state = SW_FRAGMENT; + state = SW_KEY_LEN; + /*state = SW_FRAGMENT;*/ } else if (redis_argeval(r)) { if (r->rnarg == 0) { goto done; @@ -2060,7 +2061,8 @@ redis_pre_coalesce(struct msg *r) STAILQ_INSERT_HEAD(&r->mhdr, mbuf, next); } break; - + case MSG_RSP_REDIS_STATUS: //this is the orig mget msg, PING-PONG + break; default: /* * Valid responses for a fragmented request are MSG_RSP_REDIS_INTEGER or, @@ -2076,6 +2078,85 @@ redis_pre_coalesce(struct msg *r) } } +/* + * copy one response from src to dst + * return bytes copied + * */ +uint32_t +redis_copy_bulk(struct msg *dst, struct msg * src){ + struct mbuf *buf, *nbuf; + uint8_t * p; + uint32_t len = 0; + uint32_t bytes = 0; + + buf = STAILQ_FIRST(&src->mhdr); + p = buf->pos; + ASSERT(*p == '$'); + p++; + + if (p[0] == '-' && p[1] == '1'){ + len = 1 + 2 + CRLF_LEN; + p = buf->pos + len; + }else{ + len = 0; + for (; p < buf->last && isdigit(*p); p++) { + len = len * 10 + (uint32_t)(*p - '0'); + } + len += CRLF_LEN * 2; + len += (p - buf->pos); + } + bytes = len; + + // copy len bytes to dst + for(; buf ;){ + if(mbuf_length(buf) <= len){ //move this buf from src to dst + nbuf = STAILQ_NEXT(buf, next); + mbuf_remove(&src->mhdr, buf); + mbuf_insert(&dst->mhdr, buf); + len -= mbuf_length(buf); + buf = nbuf; + }else{ //split it + nbuf = mbuf_get(); + if (nbuf == NULL) { + return -1; //TODO + } + mbuf_copy(nbuf, buf->pos, len); + mbuf_insert(&dst->mhdr, nbuf); + buf->pos += len; + break; + } + } + return bytes; +} + +void +redis_post_coalesce_mget(struct msg *request) { + struct msg *response = request->peer; + struct msg *sub_msg; + struct mbuf * mbuf; + int len; + int i; + + mbuf = STAILQ_FIRST(&response->mhdr); + mbuf->last = mbuf->pos = mbuf->start; + + len = nc_scnprintf(mbuf->last, mbuf_size(mbuf), "*%d\r\n", request->narg - 1); + mbuf->last += len; + response->mlen = (uint32_t)len; + + for(i = 1; i < request->narg; i++){ //for each key + sub_msg = request->frag_seq[i]->peer; //get it's peer response + len = redis_copy_bulk(response, sub_msg); + ASSERT(len>=0); + log_debug(LOG_VVERB, "redis_copy_bulk for mget copy bytes: %d", len); + response->mlen += len; + sub_msg->mlen -= len; + } + + nc_free(request->frag_seq); + /*msg_dump(response, LOG_VVERB);*/ +} + /* * Post-coalesce handler is invoked when the message is a response to * the fragmented multi vector request - 'mget' or 'del' and all the @@ -2119,9 +2200,14 @@ redis_post_coalesce(struct msg *r) mbuf = STAILQ_FIRST(&pr->mhdr); ASSERT(mbuf_empty(mbuf)); - n = nc_scnprintf(mbuf->last, mbuf_size(mbuf), "*%d\r\n", r->nfrag); + n = nc_scnprintf(mbuf->last, mbuf_size(mbuf), "*%d\r\n", r->narg - 1); mbuf->last += n; pr->mlen += (uint32_t)n; + pr->mlen = (uint32_t)n; + break; + + case MSG_RSP_REDIS_STATUS: //this is the orig mget msg, PING-PONG + redis_post_coalesce_mget(r); break; default: From 16180fdfc0cd1f75434385ddc3abcbcfb456a97c Mon Sep 17 00:00:00 2001 From: idning Date: Tue, 1 Apr 2014 13:06:18 +0800 Subject: [PATCH 03/13] fix multi DEL, a bug on parse key, and remove assert on conn_sendv --- scripts/test_mget.py | 54 ++++++++++++++++++++ src/nc_message.c | 114 +++++++++++++++++++++++++------------------ src/proto/nc_proto.h | 1 + src/proto/nc_redis.c | 30 ++++++++++-- 4 files changed, 146 insertions(+), 53 deletions(-) create mode 100644 scripts/test_mget.py diff --git a/scripts/test_mget.py b/scripts/test_mget.py new file mode 100644 index 00000000..dd753dac --- /dev/null +++ b/scripts/test_mget.py @@ -0,0 +1,54 @@ +#!/usr/bin/env python +#coding: utf-8 +#file : test_mget.py +#author : ning +#date : 2014-04-01 13:15:48 + +''' +usage: + +export REDIS_HOST=127.0.0.1 +export REDIS_PORT=4000 +nosetests + +''' + +import os +import redis + +host = os.environ['REDIS_HOST'] +port = int(os.environ['REDIS_PORT']) + + +def test_mget(cnt=1000): + print 'test_many', cnt + r = redis.StrictRedis(host, port) + + #insert + pipe = r.pipeline(transaction=False) + for i in range(cnt): + pipe.set('kkk-%s'%i, 'vvv-%s'%i) + pipe.execute() + + keys = ['kkk-%s' % i for i in range(cnt)] + + #mget to check + vals = r.mget(keys) + #print vals + for i in range(cnt): + assert('vvv-%s'%i == vals[i]) + + #del + assert (cnt == r.delete(*keys) ) + + #mget again + vals = r.mget(keys) + for i in range(cnt): + assert(None == vals[i]) + +def test_many_mget(): + for i in range(1, 10000, 17): + test_mget(i) + pass + + diff --git a/src/nc_message.c b/src/nc_message.c index 626be9d4..c8a6248e 100644 --- a/src/nc_message.c +++ b/src/nc_message.c @@ -453,7 +453,7 @@ key_to_idx(struct server_pool *pool, uint8_t *key, uint32_t keylen){ * r->key_end * */ static rstatus_t -msg_fragment_mget_update_keypos(struct msg *r, struct mbuf ** in_buf){ +msg_fragment_argx_update_keypos(struct msg *r, struct mbuf ** in_buf){ struct mbuf *buf; uint8_t * p; uint32_t len = 0; @@ -472,11 +472,12 @@ msg_fragment_mget_update_keypos(struct msg *r, struct mbuf ** in_buf){ for (; p < buf->last && isdigit(*p); p++) { len = len * 10 + (uint32_t)(*p - '0'); } + keylen = len; len += (uint32_t)CRLF_LEN * 2; len += (uint32_t)(p - buf->pos); - if(mbuf_length(buf) < len){ //key no in this buf, remove it. + if(mbuf_length(buf) < len - CRLF_LEN){ //key no in this buf, remove it. len -= mbuf_length(buf); mbuf_remove(&r->mhdr, buf); mbuf_put(buf); @@ -485,13 +486,24 @@ msg_fragment_mget_update_keypos(struct msg *r, struct mbuf ** in_buf){ } r->key_end = buf->pos + len - CRLF_LEN; - r->key_start = r->key_end - keylen; //TODO, check + r->key_start = r->key_end - keylen; + buf->pos += len - CRLF_LEN; + + len = CRLF_LEN; + while(mbuf_length(buf) < len){ //eat CRLF + len -= mbuf_length(buf); + mbuf_remove(&r->mhdr, buf); + mbuf_put(buf); + + buf = STAILQ_FIRST(&r->mhdr); + } buf->pos += len; + return NC_OK; } static rstatus_t -msg_fragment_mget(struct context *ctx, struct conn *conn, struct msg *msg){ +msg_fragment_argx(struct context *ctx, struct conn *conn, struct msg *msg, struct msg *nmsg){ struct server_pool *pool; struct mbuf *mbuf, *sub_msg_mbuf; struct msg ** sub_msgs; @@ -524,7 +536,7 @@ msg_fragment_mget(struct context *ctx, struct conn *conn, struct msg *msg){ msg->frag_owner = msg; for(i = 1; i < msg->narg; i++){ //for each key - msg_fragment_mget_update_keypos(msg, &mbuf); + msg_fragment_argx_update_keypos(msg, &mbuf); uint8_t * key = msg->key_start; uint32_t keylen = (uint32_t)(msg->key_end - msg->key_start); @@ -569,7 +581,7 @@ msg_fragment_mget(struct context *ctx, struct conn *conn, struct msg *msg){ msg->key_end = mbuf->start + 12; msg->mlen = 14; - conn->recv_done(ctx, conn, msg, NULL); + conn->recv_done(ctx, conn, msg, nmsg); for(i = 0; i < pool->ncontinuum; i++){ //prepend mget header, and forward it struct msg* sub_msg = sub_msgs[i]; @@ -583,16 +595,21 @@ msg_fragment_mget(struct context *ctx, struct conn *conn, struct msg *msg){ nc_free(sub_msgs); return NC_ENOMEM; } - sub_msg_mbuf->last += nc_snprintf(sub_msg_mbuf->last, mbuf_size(sub_msg_mbuf), "*%d\r\n$4\r\nmget\r\n", - sub_msg->narg + 1); + if (msg->type == MSG_REQ_REDIS_MGET){ + sub_msg_mbuf->last += nc_snprintf(sub_msg_mbuf->last, mbuf_size(sub_msg_mbuf), "*%d\r\n$4\r\nmget\r\n", + sub_msg->narg + 1); + }else if (msg->type == MSG_REQ_REDIS_DEL){ + sub_msg_mbuf->last += nc_snprintf(sub_msg_mbuf->last, mbuf_size(sub_msg_mbuf), "*%d\r\n$3\r\ndel\r\n", + sub_msg->narg + 1); + } + sub_msg->mlen += mbuf_length(sub_msg_mbuf); - STAILQ_INSERT_HEAD(&sub_msg->mhdr, sub_msg_mbuf, next); - sub_msg->type = MSG_REQ_REDIS_MGET; + sub_msg->type = msg->type; sub_msg->frag_id = msg->frag_id; sub_msg->frag_owner = msg->frag_owner; + STAILQ_INSERT_HEAD(&sub_msg->mhdr, sub_msg_mbuf, next); - conn->recv_done(ctx, conn, sub_msg, NULL); - + conn->recv_done(ctx, conn, sub_msg, nmsg); msg->nfrag ++; } @@ -606,45 +623,45 @@ msg_parsed(struct context *ctx, struct conn *conn, struct msg *msg) struct msg *nmsg; //next msg struct mbuf *mbuf, *nbuf; - if(msg->type == MSG_REQ_REDIS_MGET){ - rstatus_t status = msg_fragment_mget(ctx, conn, msg); - if (status != NC_OK) { - return status; - } - return NC_OK;//TODO. - } - mbuf = STAILQ_LAST(&msg->mhdr, mbuf, next); if (msg->pos == mbuf->last) { /* no more data to parse */ - conn->recv_done(ctx, conn, msg, NULL); - return NC_OK; - } + nmsg = NULL; + }else{ + + /* + * Input mbuf has un-parsed data. Split mbuf of the current message msg + * into (mbuf, nbuf), where mbuf is the portion of the message that has + * been parsed and nbuf is the portion of the message that is un-parsed. + * Parse nbuf as a new message nmsg in the next iteration. + */ + nbuf = mbuf_split(&msg->mhdr, msg->pos, NULL, NULL); + if (nbuf == NULL) { + return NC_ENOMEM; + } - /* - * Input mbuf has un-parsed data. Split mbuf of the current message msg - * into (mbuf, nbuf), where mbuf is the portion of the message that has - * been parsed and nbuf is the portion of the message that is un-parsed. - * Parse nbuf as a new message nmsg in the next iteration. - */ - nbuf = mbuf_split(&msg->mhdr, msg->pos, NULL, NULL); - if (nbuf == NULL) { - return NC_ENOMEM; - } + nmsg = msg_get(msg->owner, msg->request, conn->redis); + if (nmsg == NULL) { + mbuf_put(nbuf); + return NC_ENOMEM; + } + mbuf_insert(&nmsg->mhdr, nbuf); + nmsg->pos = nbuf->pos; - nmsg = msg_get(msg->owner, msg->request, conn->redis); - if (nmsg == NULL) { - mbuf_put(nbuf); - return NC_ENOMEM; + /* update length of current (msg) and new message (nmsg) */ + nmsg->mlen = mbuf_length(nbuf); + msg->mlen -= nmsg->mlen; } - mbuf_insert(&nmsg->mhdr, nbuf); - nmsg->pos = nbuf->pos; - /* update length of current (msg) and new message (nmsg) */ - nmsg->mlen = mbuf_length(nbuf); - msg->mlen -= nmsg->mlen; - - conn->recv_done(ctx, conn, msg, nmsg); + if(redis_argx(msg)){ + rstatus_t status = msg_fragment_argx(ctx, conn, msg, nmsg); + if (status != NC_OK) { + return status; + } + return NC_OK;//TODO. + }else{ + conn->recv_done(ctx, conn, msg, nmsg); + } return NC_OK; } @@ -938,11 +955,12 @@ msg_send_chain(struct context *ctx, struct conn *conn, struct msg *msg) } } - ASSERT(!TAILQ_EMPTY(&send_msgq) && nsend != 0); - conn->smsg = NULL; - - n = conn_sendv(conn, &sendv, nsend); + if(!TAILQ_EMPTY(&send_msgq) && nsend != 0){ + n = conn_sendv(conn, &sendv, nsend); + }else{ + n = 0; + } nsent = n > 0 ? (size_t)n : 0; diff --git a/src/proto/nc_proto.h b/src/proto/nc_proto.h index e6eb9800..5d6a793c 100644 --- a/src/proto/nc_proto.h +++ b/src/proto/nc_proto.h @@ -145,6 +145,7 @@ rstatus_t memcache_post_splitcopy(struct msg *r); void memcache_pre_coalesce(struct msg *r); void memcache_post_coalesce(struct msg *r); +bool redis_argx(struct msg *r); void redis_parse_req(struct msg *r); void redis_parse_rsp(struct msg *r); void redis_pre_splitcopy(struct mbuf *mbuf, void *arg); diff --git a/src/proto/nc_redis.c b/src/proto/nc_redis.c index 78be9f53..d5cd0f9f 100644 --- a/src/proto/nc_redis.c +++ b/src/proto/nc_redis.c @@ -215,7 +215,7 @@ redis_argn(struct msg *r) * Return true, if the redis command is a vector command accepting one or * more keys, otherwise return false */ -static bool +bool redis_argx(struct msg *r) { switch (r->type) { @@ -2129,6 +2129,23 @@ redis_copy_bulk(struct msg *dst, struct msg * src){ return bytes; } +void +redis_post_coalesce_del(struct msg *request) { + struct msg *response = request->peer; + struct mbuf * mbuf; + uint32_t len; + + mbuf = STAILQ_FIRST(&response->mhdr); + mbuf->last = mbuf->pos = mbuf->start; //discard PONG + + len = nc_scnprintf(mbuf->last, mbuf_size(mbuf), ":%d\r\n", request->integer); + mbuf->last += len; + response->mlen += (uint32_t)len; + + nc_free(request->frag_seq); + +} + void redis_post_coalesce_mget(struct msg *request) { struct msg *response = request->peer; @@ -2138,7 +2155,7 @@ redis_post_coalesce_mget(struct msg *request) { int i; mbuf = STAILQ_FIRST(&response->mhdr); - mbuf->last = mbuf->pos = mbuf->start; + mbuf->last = mbuf->pos = mbuf->start; //discard PONG len = nc_scnprintf(mbuf->last, mbuf_size(mbuf), "*%d\r\n", request->narg - 1); mbuf->last += len; @@ -2200,14 +2217,17 @@ redis_post_coalesce(struct msg *r) mbuf = STAILQ_FIRST(&pr->mhdr); ASSERT(mbuf_empty(mbuf)); - n = nc_scnprintf(mbuf->last, mbuf_size(mbuf), "*%d\r\n", r->narg - 1); + n = nc_scnprintf(mbuf->last, mbuf_size(mbuf), "*%d\r\n", r->nfrag); mbuf->last += n; pr->mlen += (uint32_t)n; - pr->mlen = (uint32_t)n; break; case MSG_RSP_REDIS_STATUS: //this is the orig mget msg, PING-PONG - redis_post_coalesce_mget(r); + if (r->type == MSG_REQ_REDIS_MGET){ + redis_post_coalesce_mget(r); + }else if (r->type == MSG_REQ_REDIS_DEL){ + redis_post_coalesce_del(r); + } break; default: From db0ec4a774203c47e56d625168fca0c4aa594907 Mon Sep 17 00:00:00 2001 From: idning Date: Tue, 1 Apr 2014 17:48:03 +0800 Subject: [PATCH 04/13] add benchmark code --- .gitignore | 3 +++ scripts/benchmark-mget.py | 43 +++++++++++++++++++++++++++++++++++++++ scripts/test_mget.py | 22 +++++++++++++++++++- 3 files changed, 67 insertions(+), 1 deletion(-) create mode 100644 scripts/benchmark-mget.py diff --git a/.gitignore b/.gitignore index 014e7e3e..2556202e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ +# pyc +*.pyc + # Compiled Object files *.lo *.o diff --git a/scripts/benchmark-mget.py b/scripts/benchmark-mget.py new file mode 100644 index 00000000..4fbcb6b3 --- /dev/null +++ b/scripts/benchmark-mget.py @@ -0,0 +1,43 @@ +#!/usr/bin/env python +#coding: utf-8 +#file : test_mget.py +#author : ning +#date : 2014-04-01 13:15:48 + +import os +import re +import commands + +ports = [ + 4001, # before improve + 4000, # after improve + 2000 # redis +] + +def system(cmd): + return commands.getoutput(cmd) + +def extra(regex, text): + match = re.search(regex, text, re.DOTALL) + if match: + return match.group(1) + +def testit(): + for mget_size in [10, 100, 1000, 10000]: + for port in ports: + cnt = 100*1000 / mget_size + clients = 50 + if mget_size == 10000: + clients = 2 + cmd = 'cd /home/ning/xredis/deploy-srcs/redis-2.8.3/src && ./redis-benchmark.%d -n %d -p %d -t mget -r 1000000000 -c %d' % (mget_size, cnt, port, clients) + #print cmd + rst = system(cmd) + + #100.00% <= 2 milliseconds + #28089.89 requests per second + rtime = extra('100.00% <= (\d+) milliseconds', rst) + qps = extra('([\.\d]+) requests per second', rst) + + print 'mget_size=%d on %d: pqs: %s, rtime: %s' % (mget_size, port, qps, rtime) + +testit() diff --git a/scripts/test_mget.py b/scripts/test_mget.py index dd753dac..7281a90f 100644 --- a/scripts/test_mget.py +++ b/scripts/test_mget.py @@ -47,8 +47,28 @@ def test_mget(cnt=1000): assert(None == vals[i]) def test_many_mget(): - for i in range(1, 10000, 17): + for i in range(1, 1000, 17): test_mget(i) pass +def test_large_mget(cnt=5): + r = redis.StrictRedis(host, port) + + kv = {} + for i in range(cnt): + kv['kkk-%s' % i] = os.urandom(1024*1024*8) + + #insert + for i in range(cnt): + key = 'kkk-%s' % i + r.set(key, kv[key]) + + keys = ['kkk-%s' % i for i in range(cnt)] + + #mget to check + vals = r.mget(keys) + for i in range(cnt): + key = 'kkk-%s' % i + assert(kv[key] == vals[i]) + From 9927f698c3071c30f46f52d9276506650b5df1b8 Mon Sep 17 00:00:00 2001 From: idning Date: Thu, 3 Apr 2014 16:47:45 +0800 Subject: [PATCH 05/13] MSET/MSETNX support now --- notes/redis.md | 6 +- scripts/{test_mget.py => test_mget_mset.py} | 31 +++++---- src/nc_message.c | 34 +++++++++- src/nc_message.h | 2 + src/proto/nc_proto.h | 2 + src/proto/nc_redis.c | 72 ++++++++++++++++++++- 6 files changed, 129 insertions(+), 18 deletions(-) rename scripts/{test_mget.py => test_mget_mset.py} (64%) diff --git a/notes/redis.md b/notes/redis.md index f969eb83..fba30f18 100644 --- a/notes/redis.md +++ b/notes/redis.md @@ -79,9 +79,9 @@ +-------------------+------------+---------------------------------------------------------------------------------------------------------------------+ | MGET | Yes | MGET key [key ...] | +-------------------+------------+---------------------------------------------------------------------------------------------------------------------+ - | MSET | No | MSET key value [key value ...] | + | MSET | Yes* | MSET key value [key value ...] | +-------------------+------------+---------------------------------------------------------------------------------------------------------------------+ - | MSETNX | No | MSETNX key value [key value ...] | + | MSETNX | Yes* | MSETNX key value [key value ...] | +-------------------+------------+---------------------------------------------------------------------------------------------------------------------+ | PSETEX | Yes | PSETEX key milliseconds value | +-------------------+------------+---------------------------------------------------------------------------------------------------------------------+ @@ -98,6 +98,8 @@ | STRLEN | Yes | STRLEN key | +-------------------+------------+---------------------------------------------------------------------------------------------------------------------+ +* MSET/MSETNX support is not Atomic + ### Hashes +-------------------+------------+---------------------------------------------------------------------------------------------------------------------+ diff --git a/scripts/test_mget.py b/scripts/test_mget_mset.py similarity index 64% rename from scripts/test_mget.py rename to scripts/test_mget_mset.py index 7281a90f..f361a479 100644 --- a/scripts/test_mget.py +++ b/scripts/test_mget_mset.py @@ -20,21 +20,26 @@ port = int(os.environ['REDIS_PORT']) -def test_mget(cnt=1000): +def test_mget(cnt=10): print 'test_many', cnt r = redis.StrictRedis(host, port) - #insert - pipe = r.pipeline(transaction=False) - for i in range(cnt): - pipe.set('kkk-%s'%i, 'vvv-%s'%i) - pipe.execute() + def insert_by_pipeline(): + pipe = r.pipeline(transaction=False) + for i in range(cnt): + pipe.set('kkk-%s'%i, 'vvv-%s'%i) + pipe.execute() + + def insert_by_mset(): + kv = {'kkk-%s' % i :'vvv-%s' % i for i in range(cnt)} + ret = r.mset(**kv) + insert_by_mset() keys = ['kkk-%s' % i for i in range(cnt)] #mget to check vals = r.mget(keys) - #print vals + #print 'vals', vals for i in range(cnt): assert('vvv-%s'%i == vals[i]) @@ -47,28 +52,28 @@ def test_mget(cnt=1000): assert(None == vals[i]) def test_many_mget(): - for i in range(1, 1000, 17): + for i in range(1, 10000, 17): test_mget(i) pass - def test_large_mget(cnt=5): r = redis.StrictRedis(host, port) kv = {} for i in range(cnt): - kv['kkk-%s' % i] = os.urandom(1024*1024*8) + kv['kkx-%s' % i] = os.urandom(1024*1024*8) #insert for i in range(cnt): - key = 'kkk-%s' % i + key = 'kkx-%s' % i r.set(key, kv[key]) - keys = ['kkk-%s' % i for i in range(cnt)] + keys = ['kkx-%s' % i for i in range(cnt)] #mget to check vals = r.mget(keys) for i in range(cnt): - key = 'kkk-%s' % i + key = 'kkx-%s' % i assert(kv[key] == vals[i]) + diff --git a/src/nc_message.c b/src/nc_message.c index c8a6248e..822f54b0 100644 --- a/src/nc_message.c +++ b/src/nc_message.c @@ -503,7 +503,7 @@ msg_fragment_argx_update_keypos(struct msg *r, struct mbuf ** in_buf){ } static rstatus_t -msg_fragment_argx(struct context *ctx, struct conn *conn, struct msg *msg, struct msg *nmsg){ +msg_fragment_argx(struct context *ctx, struct conn *conn, struct msg *msg, struct msg *nmsg, int key_step){ struct server_pool *pool; struct mbuf *mbuf, *sub_msg_mbuf; struct msg ** sub_msgs; @@ -568,6 +568,24 @@ msg_fragment_argx(struct context *ctx, struct conn *conn, struct msg *msg, struc sub_msg_mbuf->last += nc_snprintf(sub_msg_mbuf->last, mbuf_size(sub_msg_mbuf), "\r\n"); sub_msg->mlen += (uint32_t)(sub_msg_mbuf->last - p); + + if(key_step == 1){ //mget,del + continue; + } else{ //mset, msetex + uint32_t len = redis_copy_bulk(sub_msg, msg); + log_debug(LOG_VVERB, "redis_copy_bulk for mset copy bytes: %d", len); + i++; + sub_msg->narg ++; + } + } + + if (STAILQ_EMPTY(&msg->mhdr)){ + mbuf = mbuf_get(); + if (mbuf == NULL) { + nc_free(msg); + return NC_ENOMEM; + } + mbuf_insert(&msg->mhdr, mbuf); } msg_reset_mbufs(msg); @@ -601,6 +619,12 @@ msg_fragment_argx(struct context *ctx, struct conn *conn, struct msg *msg, struc }else if (msg->type == MSG_REQ_REDIS_DEL){ sub_msg_mbuf->last += nc_snprintf(sub_msg_mbuf->last, mbuf_size(sub_msg_mbuf), "*%d\r\n$3\r\ndel\r\n", sub_msg->narg + 1); + }else if (msg->type == MSG_REQ_REDIS_MSET){ + sub_msg_mbuf->last += nc_snprintf(sub_msg_mbuf->last, mbuf_size(sub_msg_mbuf), "*%d\r\n$4\r\nmset\r\n", + sub_msg->narg + 1); + }else if (msg->type == MSG_REQ_REDIS_MSETNX){ + sub_msg_mbuf->last += nc_snprintf(sub_msg_mbuf->last, mbuf_size(sub_msg_mbuf), "*%d\r\n$6\r\nmsetnx\r\n", + sub_msg->narg + 1); } sub_msg->mlen += mbuf_length(sub_msg_mbuf); @@ -654,7 +678,13 @@ msg_parsed(struct context *ctx, struct conn *conn, struct msg *msg) } if(redis_argx(msg)){ - rstatus_t status = msg_fragment_argx(ctx, conn, msg, nmsg); + rstatus_t status = msg_fragment_argx(ctx, conn, msg, nmsg, 1); + if (status != NC_OK) { + return status; + } + return NC_OK;//TODO. + }else if(redis_arg2x(msg)){ + rstatus_t status = msg_fragment_argx(ctx, conn, msg, nmsg, 2); if (status != NC_OK) { return status; } diff --git a/src/nc_message.h b/src/nc_message.h index b94758cd..ea862811 100644 --- a/src/nc_message.h +++ b/src/nc_message.h @@ -80,6 +80,8 @@ typedef enum msg_type { MSG_REQ_REDIS_INCRBY, MSG_REQ_REDIS_INCRBYFLOAT, MSG_REQ_REDIS_MGET, + MSG_REQ_REDIS_MSET, /*mset & msetnx*/ + MSG_REQ_REDIS_MSETNX, MSG_REQ_REDIS_PSETEX, MSG_REQ_REDIS_RESTORE, MSG_REQ_REDIS_SET, diff --git a/src/proto/nc_proto.h b/src/proto/nc_proto.h index 5d6a793c..05667a8c 100644 --- a/src/proto/nc_proto.h +++ b/src/proto/nc_proto.h @@ -145,6 +145,8 @@ rstatus_t memcache_post_splitcopy(struct msg *r); void memcache_pre_coalesce(struct msg *r); void memcache_post_coalesce(struct msg *r); +uint32_t redis_copy_bulk(struct msg *dst, struct msg * src); + bool redis_argx(struct msg *r); void redis_parse_req(struct msg *r); void redis_parse_rsp(struct msg *r); diff --git a/src/proto/nc_redis.c b/src/proto/nc_redis.c index d5cd0f9f..c2665521 100644 --- a/src/proto/nc_redis.c +++ b/src/proto/nc_redis.c @@ -230,6 +230,25 @@ redis_argx(struct msg *r) return false; } +/* + * Return true, if the redis command is a vector command accepting one or + * more key-value pairs, otherwise return false + */ +bool +redis_arg2x(struct msg *r) +{ + switch (r->type) { + case MSG_REQ_REDIS_MSET: + case MSG_REQ_REDIS_MSETNX: + return true; + + default: + break; + } + + return false; +} + /* * Return true, if the redis command is either EVAL or EVALSHA. These commands * have a special format with exactly 2 arguments, followed by one or more keys, @@ -540,6 +559,10 @@ redis_parse_req(struct msg *r) r->type = MSG_REQ_REDIS_MGET; break; } + if (str4icmp(m, 'm', 's', 'e', 't')) { + r->type = MSG_REQ_REDIS_MSET; + break; + } if (str4icmp(m, 'z', 'a', 'd', 'd')) { r->type = MSG_REQ_REDIS_ZADD; @@ -632,6 +655,10 @@ redis_parse_req(struct msg *r) break; case 6: + if (str6icmp(m, 'm', 's', 'e', 't', 'n', 'x')) { + r->type = MSG_REQ_REDIS_MSETNX; + break; + } if (str6icmp(m, 'a', 'p', 'p', 'e', 'n', 'd')) { r->type = MSG_REQ_REDIS_APPEND; break; @@ -1053,6 +1080,11 @@ redis_parse_req(struct msg *r) } state = SW_KEY_LEN; /*state = SW_FRAGMENT;*/ + } else if (redis_arg2x(r)) { + if (r->rnarg == 0) { + goto done; + } + state = SW_ARG1_LEN; } else if (redis_argeval(r)) { if (r->rnarg == 0) { goto done; @@ -1156,6 +1188,11 @@ redis_parse_req(struct msg *r) goto error; } state = SW_ARG2_LEN; + } else if (redis_arg2x(r)) { + if (r->rnarg == 0) { + goto done; + } + state = SW_KEY_LEN; } else { goto error; } @@ -2061,8 +2098,17 @@ redis_pre_coalesce(struct msg *r) STAILQ_INSERT_HEAD(&r->mhdr, mbuf, next); } break; - case MSG_RSP_REDIS_STATUS: //this is the orig mget msg, PING-PONG + + case MSG_RSP_REDIS_STATUS: + if (pr->type == MSG_REQ_REDIS_MSET || pr->type == MSG_REQ_REDIS_MSETNX){ //MSET segments + mbuf = STAILQ_FIRST(&r->mhdr); + r->mlen -= mbuf_length(mbuf); + mbuf_rewind(mbuf); + }else{ //this is the orig mget msg, PING-PONG + // do nothing + } break; + default: /* * Valid responses for a fragmented request are MSG_RSP_REDIS_INTEGER or, @@ -2089,6 +2135,11 @@ redis_copy_bulk(struct msg *dst, struct msg * src){ uint32_t len = 0; uint32_t bytes = 0; + for(buf = STAILQ_FIRST(&src->mhdr); buf->pos >= buf->last; buf = STAILQ_FIRST(&src->mhdr)){ + mbuf_remove(&src->mhdr, buf); + mbuf_put(buf); + } + buf = STAILQ_FIRST(&src->mhdr); p = buf->pos; ASSERT(*p == '$'); @@ -2129,6 +2180,22 @@ redis_copy_bulk(struct msg *dst, struct msg * src){ return bytes; } +void +redis_post_coalesce_msetx(struct msg *request) { + struct msg *response = request->peer; + struct mbuf * mbuf; + uint32_t len; + + mbuf = STAILQ_FIRST(&response->mhdr); + mbuf->last = mbuf->pos = mbuf->start; //discard PONG + + len = nc_scnprintf(mbuf->last, mbuf_size(mbuf), "+OK\r\n"); + mbuf->last += len; + response->mlen += (uint32_t)len; + + nc_free(request->frag_seq); +} + void redis_post_coalesce_del(struct msg *request) { struct msg *response = request->peer; @@ -2227,7 +2294,10 @@ redis_post_coalesce(struct msg *r) redis_post_coalesce_mget(r); }else if (r->type == MSG_REQ_REDIS_DEL){ redis_post_coalesce_del(r); + }else if (r->type == MSG_REQ_REDIS_MSET || r->type == MSG_REQ_REDIS_MSETNX){ + redis_post_coalesce_msetx(r); } + break; default: From d959c85218489bda20c2e1cd567bcf00584fa766 Mon Sep 17 00:00:00 2001 From: idning Date: Mon, 21 Apr 2014 09:43:28 +0800 Subject: [PATCH 06/13] follow the coding style --- src/nc_message.c | 57 +++++++++++++++++++++++++------------------- src/proto/nc_proto.h | 1 + src/proto/nc_redis.c | 38 ++++++++++++++--------------- 3 files changed, 52 insertions(+), 44 deletions(-) diff --git a/src/nc_message.c b/src/nc_message.c index 822f54b0..0c2fadb1 100644 --- a/src/nc_message.c +++ b/src/nc_message.c @@ -430,7 +430,7 @@ msg_empty(struct msg *msg) static uint32_t key_to_idx(struct server_pool *pool, uint8_t *key, uint32_t keylen){ - //hash_tag + /*hash_tag*/ if (!string_empty(&pool->hash_tag)) { struct string *tag = &pool->hash_tag; uint8_t *tag_start, *tag_end; @@ -477,7 +477,7 @@ msg_fragment_argx_update_keypos(struct msg *r, struct mbuf ** in_buf){ len += (uint32_t)CRLF_LEN * 2; len += (uint32_t)(p - buf->pos); - if(mbuf_length(buf) < len - CRLF_LEN){ //key no in this buf, remove it. + if(mbuf_length(buf) < len - CRLF_LEN){ /*key no in this buf, remove it.*/ len -= mbuf_length(buf); mbuf_remove(&r->mhdr, buf); mbuf_put(buf); @@ -490,7 +490,7 @@ msg_fragment_argx_update_keypos(struct msg *r, struct mbuf ** in_buf){ buf->pos += len - CRLF_LEN; len = CRLF_LEN; - while(mbuf_length(buf) < len){ //eat CRLF + while(mbuf_length(buf) < len){ /*eat CRLF*/ len -= mbuf_length(buf); mbuf_remove(&r->mhdr, buf); mbuf_put(buf); @@ -503,27 +503,28 @@ msg_fragment_argx_update_keypos(struct msg *r, struct mbuf ** in_buf){ } static rstatus_t -msg_fragment_argx(struct context *ctx, struct conn *conn, struct msg *msg, struct msg *nmsg, int key_step){ +msg_fragment_argx(struct context *ctx, struct conn *conn, struct msg *msg, struct msg *nmsg, uint32_t key_step){ struct server_pool *pool; struct mbuf *mbuf, *sub_msg_mbuf; struct msg ** sub_msgs; uint32_t i; + ASSERT(conn->client && !conn->proxy); ASSERT(conn->owner != NULL); - //init sub_msgs and msg->frag_seq + /*init sub_msgs and msg->frag_seq*/ pool = conn->owner; sub_msgs = nc_alloc(pool->ncontinuum * sizeof(void *) ); for(i = 0; i < pool->ncontinuum; i++){ sub_msgs[i] = msg_get(msg->owner, msg->request, conn->redis); } - msg->frag_seq = nc_alloc(sizeof(struct msg *) * msg->narg); //the point for each key, point to sub_msgs elements + msg->frag_seq = nc_alloc(sizeof(struct msg *) * msg->narg); /*the point for each key, point to sub_msgs elements*/ mbuf = STAILQ_FIRST(&msg->mhdr); mbuf->pos = mbuf->start; - for(i = 0; i< 3; i++){ //eat *narg\r\n$4\r\nMGET\r\n + for(i = 0; i< 3; i++){ /*eat *narg\r\n$4\r\nMGET\r\n*/ for(;*(mbuf->pos) != '\n';){ mbuf->pos ++; } @@ -535,20 +536,26 @@ msg_fragment_argx(struct context *ctx, struct conn *conn, struct msg *msg, struc msg->nfrag = 0; msg->frag_owner = msg; - for(i = 1; i < msg->narg; i++){ //for each key - msg_fragment_argx_update_keypos(msg, &mbuf); + for(i = 1; i < msg->narg; i++){ /*for each key*/ + uint8_t * key; + uint8_t * p; + uint32_t keylen; + uint32_t idx; + struct msg *sub_msg; + uint32_t len; - uint8_t * key = msg->key_start; - uint32_t keylen = (uint32_t)(msg->key_end - msg->key_start); - uint32_t idx = key_to_idx(pool, key, keylen); - struct msg *sub_msg = sub_msgs[idx]; + msg_fragment_argx_update_keypos(msg, &mbuf); + key = msg->key_start; + keylen = (uint32_t)(msg->key_end - msg->key_start); + idx = key_to_idx(pool, key, keylen); + sub_msg = sub_msgs[idx]; msg->frag_seq[i] = sub_msgs[idx]; sub_msg->narg ++; if (STAILQ_EMPTY(&sub_msg->mhdr) - || mbuf_size(STAILQ_LAST(&sub_msg->mhdr, mbuf, next)) < keylen + CRLF_LEN + CRLF_LEN + 6 ){ //6 is $ len (key <= 64k, so len less then 5) + || mbuf_size(STAILQ_LAST(&sub_msg->mhdr, mbuf, next)) < keylen + CRLF_LEN + CRLF_LEN + 6 ){ /*6 is $ len (key <= 64k, so len less then 5)*/ sub_msg_mbuf = mbuf_get(); if (sub_msg_mbuf == NULL) { nc_free(sub_msgs); @@ -559,20 +566,20 @@ msg_fragment_argx(struct context *ctx, struct conn *conn, struct msg *msg, struc sub_msg_mbuf = STAILQ_LAST(&sub_msg->mhdr, mbuf, next); } - uint8_t * p = sub_msg_mbuf ->last; + p = sub_msg_mbuf ->last; sub_msg_mbuf->last += nc_snprintf(sub_msg_mbuf->last, mbuf_size(sub_msg_mbuf), "$%d\r\n", keylen); - sub_msg->key_start = sub_msg_mbuf->last; //update key_start + sub_msg->key_start = sub_msg_mbuf->last; /*update key_start*/ mbuf_copy(sub_msg_mbuf, key, keylen); - sub_msg->key_end = sub_msg_mbuf->last; //update key_start + sub_msg->key_end = sub_msg_mbuf->last; /*update key_start*/ sub_msg_mbuf->last += nc_snprintf(sub_msg_mbuf->last, mbuf_size(sub_msg_mbuf), "\r\n"); sub_msg->mlen += (uint32_t)(sub_msg_mbuf->last - p); - if(key_step == 1){ //mget,del + if(key_step == 1){ /*mget,del*/ continue; - } else{ //mset, msetex - uint32_t len = redis_copy_bulk(sub_msg, msg); + } else{ /*mset, msetex*/ + len = redis_copy_bulk(sub_msg, msg); log_debug(LOG_VVERB, "redis_copy_bulk for mset copy bytes: %d", len); i++; sub_msg->narg ++; @@ -589,7 +596,7 @@ msg_fragment_argx(struct context *ctx, struct conn *conn, struct msg *msg, struc } msg_reset_mbufs(msg); - //rewrite the orig msg to a PING cmd + /*rewrite the orig msg to a PING command*/ mbuf = STAILQ_FIRST(&msg->mhdr); mbuf->pos = mbuf->last = mbuf->start; @@ -601,7 +608,7 @@ msg_fragment_argx(struct context *ctx, struct conn *conn, struct msg *msg, struc conn->recv_done(ctx, conn, msg, nmsg); - for(i = 0; i < pool->ncontinuum; i++){ //prepend mget header, and forward it + for(i = 0; i < pool->ncontinuum; i++){ /*prepend mget header, and forward it*/ struct msg* sub_msg = sub_msgs[i]; if(STAILQ_EMPTY(&sub_msg->mhdr)){ msg_put(sub_msg); @@ -644,7 +651,7 @@ msg_fragment_argx(struct context *ctx, struct conn *conn, struct msg *msg, struc static rstatus_t msg_parsed(struct context *ctx, struct conn *conn, struct msg *msg) { - struct msg *nmsg; //next msg + struct msg *nmsg; /*next msg*/ struct mbuf *mbuf, *nbuf; mbuf = STAILQ_LAST(&msg->mhdr, mbuf, next); @@ -682,13 +689,13 @@ msg_parsed(struct context *ctx, struct conn *conn, struct msg *msg) if (status != NC_OK) { return status; } - return NC_OK;//TODO. + return NC_OK; }else if(redis_arg2x(msg)){ rstatus_t status = msg_fragment_argx(ctx, conn, msg, nmsg, 2); if (status != NC_OK) { return status; } - return NC_OK;//TODO. + return NC_OK; }else{ conn->recv_done(ctx, conn, msg, nmsg); } diff --git a/src/proto/nc_proto.h b/src/proto/nc_proto.h index 05667a8c..47cff8ae 100644 --- a/src/proto/nc_proto.h +++ b/src/proto/nc_proto.h @@ -148,6 +148,7 @@ void memcache_post_coalesce(struct msg *r); uint32_t redis_copy_bulk(struct msg *dst, struct msg * src); bool redis_argx(struct msg *r); +bool redis_arg2x(struct msg *r); void redis_parse_req(struct msg *r); void redis_parse_rsp(struct msg *r); void redis_pre_splitcopy(struct mbuf *mbuf, void *arg); diff --git a/src/proto/nc_redis.c b/src/proto/nc_redis.c index c2665521..09e1b6d9 100644 --- a/src/proto/nc_redis.c +++ b/src/proto/nc_redis.c @@ -1961,7 +1961,7 @@ void redis_pre_splitcopy(struct mbuf *mbuf, void *arg) { struct msg *r = arg; - int n; + uint32_t n; ASSERT(r->request); ASSERT(r->narg > 1); @@ -2100,12 +2100,12 @@ redis_pre_coalesce(struct msg *r) break; case MSG_RSP_REDIS_STATUS: - if (pr->type == MSG_REQ_REDIS_MSET || pr->type == MSG_REQ_REDIS_MSETNX){ //MSET segments + if (pr->type == MSG_REQ_REDIS_MSET || pr->type == MSG_REQ_REDIS_MSETNX){ /*MSET segments*/ mbuf = STAILQ_FIRST(&r->mhdr); r->mlen -= mbuf_length(mbuf); mbuf_rewind(mbuf); - }else{ //this is the orig mget msg, PING-PONG - // do nothing + }else{ /*this is the orig mget/mset/msetnx msg, (PONG)*/ + /*do nothing*/ } break; @@ -2158,18 +2158,18 @@ redis_copy_bulk(struct msg *dst, struct msg * src){ } bytes = len; - // copy len bytes to dst + /*copy len bytes to dst*/ for(; buf ;){ - if(mbuf_length(buf) <= len){ //move this buf from src to dst + if(mbuf_length(buf) <= len){ /*steal this buf from src to dst*/ nbuf = STAILQ_NEXT(buf, next); mbuf_remove(&src->mhdr, buf); mbuf_insert(&dst->mhdr, buf); len -= mbuf_length(buf); buf = nbuf; - }else{ //split it + }else{ /*split it*/ nbuf = mbuf_get(); if (nbuf == NULL) { - return -1; //TODO + return -1; } mbuf_copy(nbuf, buf->pos, len); mbuf_insert(&dst->mhdr, nbuf); @@ -2187,7 +2187,7 @@ redis_post_coalesce_msetx(struct msg *request) { uint32_t len; mbuf = STAILQ_FIRST(&response->mhdr); - mbuf->last = mbuf->pos = mbuf->start; //discard PONG + mbuf->last = mbuf->pos = mbuf->start; /*discard PONG*/ len = nc_scnprintf(mbuf->last, mbuf_size(mbuf), "+OK\r\n"); mbuf->last += len; @@ -2203,7 +2203,7 @@ redis_post_coalesce_del(struct msg *request) { uint32_t len; mbuf = STAILQ_FIRST(&response->mhdr); - mbuf->last = mbuf->pos = mbuf->start; //discard PONG + mbuf->last = mbuf->pos = mbuf->start; /*discard PONG*/ len = nc_scnprintf(mbuf->last, mbuf_size(mbuf), ":%d\r\n", request->integer); mbuf->last += len; @@ -2213,23 +2213,23 @@ redis_post_coalesce_del(struct msg *request) { } -void +static void redis_post_coalesce_mget(struct msg *request) { struct msg *response = request->peer; struct msg *sub_msg; struct mbuf * mbuf; - int len; - int i; + uint32_t len; + uint32_t i; mbuf = STAILQ_FIRST(&response->mhdr); - mbuf->last = mbuf->pos = mbuf->start; //discard PONG + mbuf->last = mbuf->pos = mbuf->start; /*discard PONG*/ len = nc_scnprintf(mbuf->last, mbuf_size(mbuf), "*%d\r\n", request->narg - 1); mbuf->last += len; response->mlen = (uint32_t)len; - for(i = 1; i < request->narg; i++){ //for each key - sub_msg = request->frag_seq[i]->peer; //get it's peer response + for(i = 1; i < request->narg; i++){ /*for each key*/ + sub_msg = request->frag_seq[i]->peer; /*get it's peer response*/ len = redis_copy_bulk(response, sub_msg); ASSERT(len>=0); log_debug(LOG_VVERB, "redis_copy_bulk for mget copy bytes: %d", len); @@ -2238,7 +2238,6 @@ redis_post_coalesce_mget(struct msg *request) { } nc_free(request->frag_seq); - /*msg_dump(response, LOG_VVERB);*/ } /* @@ -2252,7 +2251,7 @@ redis_post_coalesce(struct msg *r) { struct msg *pr = r->peer; /* peer response */ struct mbuf *mbuf; - int n; + uint32_t n; ASSERT(r->request && r->first_fragment); if (r->error || r->ferror) { @@ -2289,7 +2288,8 @@ redis_post_coalesce(struct msg *r) pr->mlen += (uint32_t)n; break; - case MSG_RSP_REDIS_STATUS: //this is the orig mget msg, PING-PONG + case MSG_RSP_REDIS_STATUS: + /*this is the orig mget/mset/msetnx msg, (PONG)*/ if (r->type == MSG_REQ_REDIS_MGET){ redis_post_coalesce_mget(r); }else if (r->type == MSG_REQ_REDIS_DEL){ From 292887860f072e166a8b8d0fd6e4058b4738306f Mon Sep 17 00:00:00 2001 From: idning Date: Tue, 22 Apr 2014 16:09:14 +0800 Subject: [PATCH 07/13] enhance error handling on backend down --- src/nc_server.c | 7 +++++++ src/proto/nc_redis.c | 3 +++ 2 files changed, 10 insertions(+) diff --git a/src/nc_server.c b/src/nc_server.c index 5e3ef28a..462e460a 100644 --- a/src/nc_server.c +++ b/src/nc_server.c @@ -368,6 +368,10 @@ server_close(struct context *ctx, struct conn *conn) msg->error = 1; msg->err = conn->err; + if (msg->frag_owner != NULL){ + msg->frag_owner->nfrag_done ++; + } + if (req_done(c_conn, TAILQ_FIRST(&c_conn->omsg_q))) { event_add_out(ctx->evb, msg->owner); } @@ -397,6 +401,9 @@ server_close(struct context *ctx, struct conn *conn) msg->done = 1; msg->error = 1; msg->err = conn->err; + if (msg->frag_owner != NULL){ + msg->frag_owner->nfrag_done ++; + } if (req_done(c_conn, TAILQ_FIRST(&c_conn->omsg_q))) { event_add_out(ctx->evb, msg->owner); diff --git a/src/proto/nc_redis.c b/src/proto/nc_redis.c index 09e1b6d9..0aa2a670 100644 --- a/src/proto/nc_redis.c +++ b/src/proto/nc_redis.c @@ -2230,6 +2230,9 @@ redis_post_coalesce_mget(struct msg *request) { for(i = 1; i < request->narg; i++){ /*for each key*/ sub_msg = request->frag_seq[i]->peer; /*get it's peer response*/ + if(sub_msg == NULL){ + continue; /*no response because of error, we do nothing and leave it to the req_error() check in rsp_send_next*/ + } len = redis_copy_bulk(response, sub_msg); ASSERT(len>=0); log_debug(LOG_VVERB, "redis_copy_bulk for mget copy bytes: %d", len); From 8377972ac744a44e1539ab68817685587400c14d Mon Sep 17 00:00:00 2001 From: idning Date: Sun, 4 May 2014 10:25:23 +0800 Subject: [PATCH 08/13] fix bug on msg_fragment_argx when key_len=mbuf_size-1 --- src/nc_message.c | 88 +++++++++++++++++++++++++++++++++--------------- src/nc_server.c | 4 +-- 2 files changed, 62 insertions(+), 30 deletions(-) diff --git a/src/nc_message.c b/src/nc_message.c index 0c2fadb1..68d691f9 100644 --- a/src/nc_message.c +++ b/src/nc_message.c @@ -492,16 +492,70 @@ msg_fragment_argx_update_keypos(struct msg *r, struct mbuf ** in_buf){ len = CRLF_LEN; while(mbuf_length(buf) < len){ /*eat CRLF*/ len -= mbuf_length(buf); - mbuf_remove(&r->mhdr, buf); - mbuf_put(buf); + buf->pos = buf->last; - buf = STAILQ_FIRST(&r->mhdr); + buf = STAILQ_NEXT(buf, next); } buf->pos += len; return NC_OK; } +static struct mbuf * +msg_ensure_mbuf(struct msg *msg, uint32_t len){ + struct mbuf *mbuf; + if (STAILQ_EMPTY(&msg->mhdr) + || mbuf_size(STAILQ_LAST(&msg->mhdr, mbuf, next)) < len ){ + mbuf = mbuf_get(); + if (mbuf == NULL) { + return NULL; + } + mbuf_insert(&msg->mhdr, mbuf); + }else{ + mbuf = STAILQ_LAST(&msg->mhdr, mbuf, next); + } + return mbuf; +} + +static rstatus_t +msg_append_key(struct msg *msg, uint8_t * key, uint32_t keylen){ + uint32_t len; + struct mbuf *mbuf; + uint8_t printbuf[32]; + + /*1. keylen*/ + len = (uint32_t)nc_snprintf(printbuf, sizeof(printbuf), "$%d\r\n", keylen); + mbuf = msg_ensure_mbuf(msg, len); + if (mbuf == NULL) { + nc_free(msg); + return NC_ENOMEM; + } + mbuf_copy(mbuf, printbuf, len); + msg->mlen += len; + + /*2. key*/ + mbuf = msg_ensure_mbuf(msg, keylen); + if (mbuf == NULL) { + nc_free(msg); + return NC_ENOMEM; + } + msg->key_start = mbuf->last; /*update key_start*/ + mbuf_copy(mbuf, key, keylen); + msg->mlen += keylen; + msg->key_end = mbuf->last; /*update key_start*/ + + /*3. CRLF*/ + mbuf = msg_ensure_mbuf(msg, CRLF_LEN); + if (mbuf == NULL) { + nc_free(msg); + return NC_ENOMEM; + } + mbuf_copy(mbuf, (uint8_t *)CRLF, CRLF_LEN); + msg->mlen += (uint32_t)CRLF_LEN; + return NC_OK; +} + + static rstatus_t msg_fragment_argx(struct context *ctx, struct conn *conn, struct msg *msg, struct msg *nmsg, uint32_t key_step){ struct server_pool *pool; @@ -509,7 +563,6 @@ msg_fragment_argx(struct context *ctx, struct conn *conn, struct msg *msg, struc struct msg ** sub_msgs; uint32_t i; - ASSERT(conn->client && !conn->proxy); ASSERT(conn->owner != NULL); @@ -538,7 +591,6 @@ msg_fragment_argx(struct context *ctx, struct conn *conn, struct msg *msg, struc for(i = 1; i < msg->narg; i++){ /*for each key*/ uint8_t * key; - uint8_t * p; uint32_t keylen; uint32_t idx; struct msg *sub_msg; @@ -553,29 +605,11 @@ msg_fragment_argx(struct context *ctx, struct conn *conn, struct msg *msg, struc msg->frag_seq[i] = sub_msgs[idx]; sub_msg->narg ++; - - if (STAILQ_EMPTY(&sub_msg->mhdr) - || mbuf_size(STAILQ_LAST(&sub_msg->mhdr, mbuf, next)) < keylen + CRLF_LEN + CRLF_LEN + 6 ){ /*6 is $ len (key <= 64k, so len less then 5)*/ - sub_msg_mbuf = mbuf_get(); - if (sub_msg_mbuf == NULL) { - nc_free(sub_msgs); - return NC_ENOMEM; - } - mbuf_insert(&sub_msg->mhdr, sub_msg_mbuf); - }else{ - sub_msg_mbuf = STAILQ_LAST(&sub_msg->mhdr, mbuf, next); + if (NC_OK != msg_append_key(sub_msg, key, keylen)){ + nc_free(sub_msgs); + return NC_ENOMEM; } - p = sub_msg_mbuf ->last; - - sub_msg_mbuf->last += nc_snprintf(sub_msg_mbuf->last, mbuf_size(sub_msg_mbuf), "$%d\r\n", keylen); - sub_msg->key_start = sub_msg_mbuf->last; /*update key_start*/ - mbuf_copy(sub_msg_mbuf, key, keylen); - sub_msg->key_end = sub_msg_mbuf->last; /*update key_start*/ - sub_msg_mbuf->last += nc_snprintf(sub_msg_mbuf->last, mbuf_size(sub_msg_mbuf), "\r\n"); - - sub_msg->mlen += (uint32_t)(sub_msg_mbuf->last - p); - if(key_step == 1){ /*mget,del*/ continue; } else{ /*mset, msetex*/ @@ -589,7 +623,7 @@ msg_fragment_argx(struct context *ctx, struct conn *conn, struct msg *msg, struc if (STAILQ_EMPTY(&msg->mhdr)){ mbuf = mbuf_get(); if (mbuf == NULL) { - nc_free(msg); + nc_free(sub_msgs); return NC_ENOMEM; } mbuf_insert(&msg->mhdr, mbuf); diff --git a/src/nc_server.c b/src/nc_server.c index 462e460a..4ba7a72e 100644 --- a/src/nc_server.c +++ b/src/nc_server.c @@ -618,7 +618,6 @@ server_pool_hash(struct server_pool *pool, uint8_t *key, uint32_t keylen) uint32_t server_pool_idx(struct server_pool *pool, uint8_t *key, uint32_t keylen) { - struct server *server; uint32_t hash, idx; ASSERT(array_n(&pool->server) != 0); @@ -641,7 +640,7 @@ server_pool_idx(struct server_pool *pool, uint8_t *key, uint32_t keylen) default: NOT_REACHED(); - return -1; + return 0; } ASSERT(idx < array_n(&pool->server)); return idx; @@ -654,7 +653,6 @@ server_pool_server(struct server_pool *pool, uint8_t *key, uint32_t keylen) uint32_t idx; idx = server_pool_idx(pool, key, keylen); - ASSERT(idx >= 0); server = array_get(&pool->server, idx); log_debug(LOG_VERB, "key '%.*s' on dist %d maps to server '%.*s'", keylen, From 699b8d471bd8926460a715872e51211dd7bf2368 Mon Sep 17 00:00:00 2001 From: idning Date: Mon, 5 May 2014 06:12:33 +0800 Subject: [PATCH 09/13] 1. get/gets for memcache passed cases 2. fix req_done(nfrag_done) for memcache --- src/nc_core.c | 6 +- src/nc_log.h | 2 +- src/nc_message.c | 218 +++++++++++++++++++++++++++++++++++++++- src/nc_request.c | 2 +- src/proto/nc_memcache.c | 166 +++++++++++++++++++++++------- src/proto/nc_proto.h | 2 + src/proto/nc_redis.c | 6 +- 7 files changed, 359 insertions(+), 43 deletions(-) diff --git a/src/nc_core.c b/src/nc_core.c index ee3beeca..a36518a4 100644 --- a/src/nc_core.c +++ b/src/nc_core.c @@ -165,16 +165,16 @@ core_recv(struct context *ctx, struct conn *conn) return status; } -static rstatus_t +rstatus_t core_send(struct context *ctx, struct conn *conn) { rstatus_t status; status = conn->send(ctx, conn); if (status != NC_OK) { - log_debug(LOG_INFO, "send on %c %d failed: %s", + log_debug(LOG_INFO, "send on %c %d failed: status:%d errno: %d %s", conn->client ? 'c' : (conn->proxy ? 'p' : 's'), conn->sd, - strerror(errno)); + status, errno, strerror(errno)); } return status; diff --git a/src/nc_log.h b/src/nc_log.h index 1b60c079..b05c2029 100644 --- a/src/nc_log.h +++ b/src/nc_log.h @@ -38,7 +38,7 @@ struct logger { #define LOG_VVVERB 10 /* verbose messages on ganga */ #define LOG_PVERB 11 /* periodic verbose messages on crack */ -#define LOG_MAX_LEN 256 /* max length of log message */ +#define LOG_MAX_LEN 512 /* max length of log message */ /* * log_stderr - log to stderr diff --git a/src/nc_message.c b/src/nc_message.c index 68d691f9..fe6e662f 100644 --- a/src/nc_message.c +++ b/src/nc_message.c @@ -453,7 +453,7 @@ key_to_idx(struct server_pool *pool, uint8_t *key, uint32_t keylen){ * r->key_end * */ static rstatus_t -msg_fragment_argx_update_keypos(struct msg *r, struct mbuf ** in_buf){ +msg_fragment_argx_update_keypos(struct msg *r){ struct mbuf *buf; uint8_t * p; uint32_t len = 0; @@ -596,7 +596,7 @@ msg_fragment_argx(struct context *ctx, struct conn *conn, struct msg *msg, struc struct msg *sub_msg; uint32_t len; - msg_fragment_argx_update_keypos(msg, &mbuf); + msg_fragment_argx_update_keypos(msg); key = msg->key_start; keylen = (uint32_t)(msg->key_end - msg->key_start); idx = key_to_idx(pool, key, keylen); @@ -682,6 +682,212 @@ msg_fragment_argx(struct context *ctx, struct conn *conn, struct msg *msg, struc return NC_OK; } + + + + +/* + * parse next key in mget request, update + * r->key_start + * r->key_end + * */ +static rstatus_t +msg_fragment_retrieval_update_keypos(struct msg *r){ + struct mbuf *buf; + uint8_t * p; + + for(buf = STAILQ_FIRST(&r->mhdr); buf->pos >= buf->last; buf = STAILQ_FIRST(&r->mhdr)){ + mbuf_remove(&r->mhdr, buf); + mbuf_put(buf); + } + + p = buf->pos; + for (; p < buf->last && isspace(*p); p++) { + //eat spaces + } + + r->key_start = p; + for (; p < buf->last && !isspace(*p); p++) { + //read key + } + r->key_end = p; + + for (; p < buf->last && isspace(*p); p++) { + //eat spaces + } + buf->pos = p; + return NC_OK; +} + +static rstatus_t +msg_append_memcache_key(struct msg *msg, uint8_t * key, uint32_t keylen){ + struct mbuf *mbuf; + + mbuf = msg_ensure_mbuf(msg, keylen+2); + if (mbuf == NULL) { + nc_free(msg); + return NC_ENOMEM; + } + msg->key_start = mbuf->last; /*update key_start*/ + mbuf_copy(mbuf, key, keylen); + msg->mlen += keylen; + msg->key_end = mbuf->last; /*update key_start*/ + + mbuf_copy(mbuf, (uint8_t *)" ", 1); + msg->mlen += 1; + return NC_OK; +} + +static void +msg_get_reply(struct context *ctx, struct conn *conn, struct msg *smsg) { + struct mbuf *mbuf; + uint32_t n; + struct msg *msg; + + msg = msg_get(conn, true, conn->redis); /*replay*/ + if (msg == NULL) { + conn->err = errno; + return; + } + + mbuf = get_mbuf(msg); + if (mbuf == NULL) { + msg_put(msg); + return; + } + + smsg->peer = msg; + msg->peer = smsg; + msg->request = 0; + + /*smsg->done = 1;*/ + /*event_add_out(ctx->ep, conn);*/ + conn->dequeue_inq(ctx, conn, msg); + conn->enqueue_outq(ctx, conn, msg); +} + +static rstatus_t +msg_fragment_retrieval(struct context *ctx, struct conn *conn, struct msg *msg, struct msg *nmsg){ + struct server_pool *pool; + struct mbuf *mbuf, *sub_msg_mbuf; + struct msg ** sub_msgs; + uint32_t i; + + ASSERT(conn->client && !conn->proxy); + ASSERT(conn->owner != NULL); + + /*init sub_msgs and msg->frag_seq*/ + pool = conn->owner; + sub_msgs = nc_alloc(pool->ncontinuum * sizeof(void *) ); + for(i = 0; i < pool->ncontinuum; i++){ + sub_msgs[i] = msg_get(msg->owner, msg->request, conn->redis); + } + + log_debug(LOG_VERB, "msg:%p, msg->narg:%d", msg, msg->narg); + msg->frag_seq = nc_alloc(sizeof(struct msg *) * msg->narg); /*the point for each key, point to sub_msgs elements*/ + + mbuf = STAILQ_FIRST(&msg->mhdr); + mbuf->pos = mbuf->start; + + for(;*(mbuf->pos) != ' ';){ /*eat 'get '*/ + mbuf->pos ++; + } + mbuf->pos ++; + + msg->frag_id = ++frag_id; + msg->first_fragment = 1; + msg->nfrag = 0; + msg->frag_owner = msg; + + for(i = 1; i < msg->narg; i++){ /*for each key*/ + uint8_t * key; + uint32_t keylen; + uint32_t idx; + struct msg *sub_msg; + uint32_t len; + + msg_fragment_retrieval_update_keypos(msg); + key = msg->key_start; + keylen = (uint32_t)(msg->key_end - msg->key_start); + idx = key_to_idx(pool, key, keylen); + sub_msg = sub_msgs[idx]; + + msg->frag_seq[i] = sub_msgs[idx]; + + sub_msg->narg ++; + if (NC_OK != msg_append_memcache_key(sub_msg, key, keylen)){ + nc_free(sub_msgs); + return NC_ENOMEM; + } + } + + if (STAILQ_EMPTY(&msg->mhdr)){ + mbuf = mbuf_get(); + if (mbuf == NULL) { + nc_free(sub_msgs); + return NC_ENOMEM; + } + mbuf_insert(&msg->mhdr, mbuf); + } + + msg_reset_mbufs(msg); + /*rewrite the orig msg to a get xx command TODO*/ + mbuf = STAILQ_FIRST(&msg->mhdr); + mbuf->pos = mbuf->last = mbuf->start; + + nc_memcpy(mbuf->pos, "get xx\r\n", 8); + mbuf->last = mbuf->start + 8; + msg->key_start = mbuf->start + 4; + msg->key_end = mbuf->start + 6; + msg->mlen = 8; + + /*msg_dump(msg, LOG_VERB);*/ + conn->recv_done(ctx, conn, msg, nmsg); + + for(i = 0; i < pool->ncontinuum; i++){ /*prepend mget header, and forward it*/ + struct msg* sub_msg = sub_msgs[i]; + if(STAILQ_EMPTY(&sub_msg->mhdr)){ + msg_put(sub_msg); + continue; + } + + /*prepend get/gets (TODO: use a function)*/ + sub_msg_mbuf = mbuf_get(); + if (sub_msg_mbuf == NULL) { + nc_free(sub_msgs); + return NC_ENOMEM; + } + if (msg->type == MSG_REQ_MC_GET){ + sub_msg_mbuf->last += nc_snprintf(sub_msg_mbuf->last, mbuf_size(sub_msg_mbuf), "get "); + }else if (msg->type == MSG_REQ_MC_GETS){ + sub_msg_mbuf->last += nc_snprintf(sub_msg_mbuf->last, mbuf_size(sub_msg_mbuf), "gets "); + } + sub_msg->mlen += mbuf_length(sub_msg_mbuf); + STAILQ_INSERT_HEAD(&sub_msg->mhdr, sub_msg_mbuf, next); + + /*append \r\n*/ + sub_msg_mbuf = mbuf_get(); + if (sub_msg_mbuf == NULL) { + nc_free(sub_msgs); + return NC_ENOMEM; + } + sub_msg_mbuf->last += nc_snprintf(sub_msg_mbuf->last, mbuf_size(sub_msg_mbuf), "\r\n"); + sub_msg->mlen += mbuf_length(sub_msg_mbuf); + STAILQ_INSERT_TAIL(&sub_msg->mhdr, sub_msg_mbuf, next); + + sub_msg->type = msg->type; + sub_msg->frag_id = msg->frag_id; + sub_msg->frag_owner = msg->frag_owner; + + /*msg_dump(sub_msg, LOG_VERB);*/ + conn->recv_done(ctx, conn, sub_msg, nmsg); + msg->nfrag ++; + } + + nc_free(sub_msgs); + return NC_OK; +} + static rstatus_t msg_parsed(struct context *ctx, struct conn *conn, struct msg *msg) { @@ -730,6 +936,12 @@ msg_parsed(struct context *ctx, struct conn *conn, struct msg *msg) return status; } return NC_OK; + }else if(memcache_retrieval(msg)){ + rstatus_t status = msg_fragment_retrieval(ctx, conn, msg, nmsg); + if (status != NC_OK) { + return status; + } + return NC_OK; }else{ conn->recv_done(ctx, conn, msg, nmsg); } @@ -1079,7 +1291,7 @@ msg_send_chain(struct context *ctx, struct conn *conn, struct msg *msg) ASSERT(TAILQ_EMPTY(&send_msgq)); - if (n > 0) { + if (n >= 0) { return NC_OK; } diff --git a/src/nc_request.c b/src/nc_request.c index 9a11a158..f140f391 100644 --- a/src/nc_request.c +++ b/src/nc_request.c @@ -84,7 +84,7 @@ req_done(struct conn *conn, struct msg *msg) return true; } - if(msg->nfrag_done < msg->nfrag){ + if(msg->redis && msg->nfrag_done < msg->nfrag){ return false; } diff --git a/src/proto/nc_memcache.c b/src/proto/nc_memcache.c index 1664640b..b3e9591d 100644 --- a/src/proto/nc_memcache.c +++ b/src/proto/nc_memcache.c @@ -73,7 +73,7 @@ memcache_cas(struct msg *r) * Return true, if the memcache command is a retrieval command, otherwise * return false */ -static bool +bool memcache_retrieval(struct msg *r) { switch (r->type) { @@ -192,6 +192,7 @@ memcache_parse_req(struct msg *r) m = r->token; r->token = NULL; r->type = MSG_UNKNOWN; + r->narg ++; switch (p - m) { @@ -307,14 +308,17 @@ memcache_parse_req(struct msg *r) case SW_SPACES_BEFORE_KEY: if (ch != ' ') { - r->token = p; - r->key_start = p; + p = p - 1; /* go back by 1 byte */ + r->token = NULL; state = SW_KEY; } - break; case SW_KEY: + if (r->token == NULL) { + r->token = p; + r->key_start = p; + } if (ch == ' ' || ch == CR) { if ((p - r->key_start) > MEMCACHE_MAX_KEY_LENGTH) { log_error("parsed bad req %"PRIu64" of type %d with key " @@ -323,6 +327,7 @@ memcache_parse_req(struct msg *r) r->key_start, p - r->key_start); goto error; } + r->narg ++; r->key_end = p; r->token = NULL; @@ -360,8 +365,10 @@ memcache_parse_req(struct msg *r) break; default: - r->token = p; - goto fragment; + r->token = NULL; + p = p - 1; /* go back by 1 byte */ + state = SW_KEY; + /*goto fragment;*/ } break; @@ -424,7 +431,6 @@ memcache_parse_req(struct msg *r) goto error; } /* vlen_start <- p */ - r->token = p; r->vlen = (uint32_t)(ch - '0'); state = SW_VLEN; } @@ -724,17 +730,17 @@ memcache_parse_rsp(struct msg *r) SW_RSP_STR, SW_SPACES_BEFORE_KEY, SW_KEY, - SW_SPACES_BEFORE_FLAGS, + SW_SPACES_BEFORE_FLAGS, //5 SW_FLAGS, SW_SPACES_BEFORE_VLEN, SW_VLEN, SW_RUNTO_VAL, - SW_VAL, + SW_VAL, //10 SW_VAL_LF, SW_END, SW_RUNTO_CRLF, SW_CRLF, - SW_ALMOST_DONE, + SW_ALMOST_DONE, //15 SW_SENTINEL } state; @@ -795,7 +801,7 @@ memcache_parse_rsp(struct msg *r) if (ch == ' ' || ch == CR) { /* type_end <- p - 1 */ m = r->token; - r->token = NULL; + /*r->token = NULL;*/ r->type = MSG_UNKNOWN; switch (p - m) { @@ -924,21 +930,9 @@ memcache_parse_rsp(struct msg *r) break; case SW_KEY: - if (r->token == NULL) { - r->token = p; - r->key_start = p; - } - if (ch == ' ') { - if ((p - r->key_start) > MEMCACHE_MAX_KEY_LENGTH) { - log_error("parsed bad req %"PRIu64" of type %d with key " - "prefix '%.*s...' and length %d that exceeds " - "maximum key length", r->id, r->type, 16, - r->key_start, p - r->key_start); - goto error; - } r->key_end = p; - r->token = NULL; + /*r->token = NULL;*/ state = SW_SPACES_BEFORE_FLAGS; } @@ -958,7 +952,7 @@ memcache_parse_rsp(struct msg *r) case SW_FLAGS: if (r->token == NULL) { /* flags_start <- p */ - r->token = p; + /*r->token = p;*/ } if (isdigit(ch)) { @@ -966,7 +960,7 @@ memcache_parse_rsp(struct msg *r) ; } else if (ch == ' ') { /* flags_end <- p - 1 */ - r->token = NULL; + /*r->token = NULL;*/ state = SW_SPACES_BEFORE_VLEN; } else { goto error; @@ -981,21 +975,18 @@ memcache_parse_rsp(struct msg *r) } p = p - 1; /* go back by 1 byte */ state = SW_VLEN; + r->vlen = 0; } break; case SW_VLEN: - if (r->token == NULL) { - /* vlen_start <- p */ - r->token = p; - r->vlen = (uint32_t)(ch - '0'); - } else if (isdigit(ch)) { + if (isdigit(ch)) { r->vlen = r->vlen * 10 + (uint32_t)(ch - '0'); } else if (ch == ' ' || ch == CR) { /* vlen_end <- p - 1 */ p = p - 1; /* go back by 1 byte */ - r->token = NULL; + /*r->token = NULL;*/ state = SW_RUNTO_CRLF; } else { goto error; @@ -1008,6 +999,7 @@ memcache_parse_rsp(struct msg *r) case LF: /* val_start <- p + 1 */ state = SW_VAL; + r->token = NULL; break; default: @@ -1041,7 +1033,8 @@ memcache_parse_rsp(struct msg *r) case SW_VAL_LF: switch (ch) { case LF: - state = SW_END; + /*state = SW_END;*/ + state = SW_RSP_STR; break; default: @@ -1134,6 +1127,9 @@ memcache_parse_rsp(struct msg *r) r->state = state; if (b->last == b->end && r->token != NULL) { + if (state <= SW_RUNTO_VAL || state == SW_CRLF || state == SW_ALMOST_DONE){ + r->state = SW_START; + } r->pos = r->token; r->token = NULL; r->result = MSG_PARSE_REPAIR; @@ -1294,6 +1290,76 @@ memcache_pre_coalesce(struct msg *r) } } + +/* + * copy one response from src to dst + * return bytes copied + * */ +uint32_t +memcache_copy_bulk(struct msg *dst, struct msg * src){ + struct mbuf *buf, *nbuf; + uint8_t * p; + uint32_t len = 0; + uint32_t bytes = 0; + uint32_t i = 0; + + for(buf = STAILQ_FIRST(&src->mhdr); buf && (buf->pos >= buf->last); buf = STAILQ_FIRST(&src->mhdr)){ + mbuf_remove(&src->mhdr, buf); + mbuf_put(buf); + } + + buf = STAILQ_FIRST(&src->mhdr); + if (buf == NULL){ + return 0; + } + p = buf->pos; + + /*get : VALUE key 0 len\r\nv\r\n */ + /*gets: VALUE key 0 len cas\r\rnv\r\n */ + + ASSERT(*p == 'V'); + for(i = 0; i < 3; i++){ /* eat 'VALUE key 0 ' */ + for(;*p != ' ';){ + p++; + } + p++; + } + + len = 0; + for (; p < buf->last && isdigit(*p); p++) { + len = len * 10 + (uint32_t)(*p - '0'); + } + + for (; p < buf->last && ('\r' != *p); p++) { /*eat cas for gets*/ + ; + } + + len += CRLF_LEN * 2; + len += (p - buf->pos); + + bytes = len; + + /*copy len bytes to dst*/ + for(; buf ;){ + if(mbuf_length(buf) <= len){ /*steal this buf from src to dst*/ + nbuf = STAILQ_NEXT(buf, next); + mbuf_remove(&src->mhdr, buf); + mbuf_insert(&dst->mhdr, buf); + len -= mbuf_length(buf); + buf = nbuf; + }else{ /*split it*/ + nbuf = mbuf_get(); + if (nbuf == NULL) { + return -1; + } + mbuf_copy(nbuf, buf->pos, len); + mbuf_insert(&dst->mhdr, nbuf); + buf->pos += len; + break; + } + } + return bytes; +} /* * Post-coalesce handler is invoked when the message is a response to * the fragmented multi vector request - 'get' or 'gets' and all the @@ -1301,6 +1367,38 @@ memcache_pre_coalesce(struct msg *r) * the fragmented request is consider to be done */ void -memcache_post_coalesce(struct msg *r) +memcache_post_coalesce(struct msg *request) { + struct msg *response = request->peer; + struct mbuf * mbuf; + uint32_t len; + struct msg *sub_msg; + uint32_t i; + + msg_reset_mbufs(response); + + for(i = 1; i < request->narg; i++){ /*for each key*/ + sub_msg = request->frag_seq[i]->peer; /*get it's peer response*/ + if(sub_msg == NULL){ + continue; /*no response because of error, we do nothing and leave it to the req_error() check in rsp_send_next*/ + } + len = memcache_copy_bulk(response, sub_msg); + ASSERT(len>=0); + log_debug(LOG_VVERB, "memcache_copy_bulk for mget copy bytes: %d", len); + response->mlen += len; + sub_msg->mlen -= len; + } + + + /*append END\r\n*/ + mbuf = mbuf_get(); + if (mbuf == NULL) { + nc_free(request->frag_seq); + return NC_ENOMEM; + } + mbuf->last += nc_snprintf(mbuf->last, mbuf_size(mbuf), "END\r\n"); + response->mlen += mbuf_length(mbuf); + STAILQ_INSERT_TAIL(&response->mhdr, mbuf, next); + + nc_free(request->frag_seq); } diff --git a/src/proto/nc_proto.h b/src/proto/nc_proto.h index 47cff8ae..e1e6c2f1 100644 --- a/src/proto/nc_proto.h +++ b/src/proto/nc_proto.h @@ -149,6 +149,8 @@ uint32_t redis_copy_bulk(struct msg *dst, struct msg * src); bool redis_argx(struct msg *r); bool redis_arg2x(struct msg *r); +bool memcache_retrieval(struct msg *r); + void redis_parse_req(struct msg *r); void redis_parse_rsp(struct msg *r); void redis_pre_splitcopy(struct mbuf *mbuf, void *arg); diff --git a/src/proto/nc_redis.c b/src/proto/nc_redis.c index 0aa2a670..61b33755 100644 --- a/src/proto/nc_redis.c +++ b/src/proto/nc_redis.c @@ -2135,12 +2135,16 @@ redis_copy_bulk(struct msg *dst, struct msg * src){ uint32_t len = 0; uint32_t bytes = 0; - for(buf = STAILQ_FIRST(&src->mhdr); buf->pos >= buf->last; buf = STAILQ_FIRST(&src->mhdr)){ + for(buf = STAILQ_FIRST(&src->mhdr); buf && (buf->pos >= buf->last); buf = STAILQ_FIRST(&src->mhdr)){ mbuf_remove(&src->mhdr, buf); mbuf_put(buf); } buf = STAILQ_FIRST(&src->mhdr); + if (buf == NULL){ + return 0; + } + p = buf->pos; ASSERT(*p == '$'); p++; From 44e75bbcf548a24973eb978413d90c1db4d75c18 Mon Sep 17 00:00:00 2001 From: idning Date: Tue, 6 May 2014 21:59:46 +0800 Subject: [PATCH 10/13] 1. do not rewrite the orig mget as a ping msg for redis, and del useless code 2. nfrag_done work for memcache --- src/nc_core.c | 2 +- src/nc_message.c | 111 ++++++++++++++++++---------------------- src/nc_request.c | 2 +- src/nc_string.h | 1 + src/proto/nc_memcache.c | 10 +++- src/proto/nc_redis.c | 55 +++----------------- 6 files changed, 68 insertions(+), 113 deletions(-) diff --git a/src/nc_core.c b/src/nc_core.c index a36518a4..0ddd4b00 100644 --- a/src/nc_core.c +++ b/src/nc_core.c @@ -165,7 +165,7 @@ core_recv(struct context *ctx, struct conn *conn) return status; } -rstatus_t +static rstatus_t core_send(struct context *ctx, struct conn *conn) { rstatus_t status; diff --git a/src/nc_message.c b/src/nc_message.c index fe6e662f..b82bdefa 100644 --- a/src/nc_message.c +++ b/src/nc_message.c @@ -428,6 +428,52 @@ msg_empty(struct msg *msg) return msg->mlen == 0 ? true : false; } +static struct mbuf * +get_mbuf(struct msg *msg) +{ + struct mbuf *mbuf; + + mbuf = STAILQ_LAST(&msg->mhdr, mbuf, next); + if (mbuf == NULL || mbuf_full(mbuf)) { + mbuf = mbuf_get(); + if (mbuf == NULL) { + return NULL; + } + + mbuf_insert(&msg->mhdr, mbuf); + msg->pos = mbuf->pos; + } + ASSERT(mbuf->end - mbuf->last > 0); + return mbuf; +} + +static void +msg_make_reply(struct context *ctx, struct conn *conn, struct msg *req) { + struct mbuf *mbuf; + struct msg *msg; + + msg = msg_get(conn, true, conn->redis); /*replay*/ + if (msg == NULL) { + conn->err = errno; + return; + } + + mbuf = get_mbuf(msg); + if (mbuf == NULL) { + msg_put(msg); + return; + } + + req->peer = msg; + msg->peer = req; + msg->request = 0; + + req->done = 1; + /*event_add_out(ctx->ep, conn);*/ + /*conn->dequeue_inq(ctx, conn, req);*/ + conn->enqueue_outq(ctx, conn, req); +} + static uint32_t key_to_idx(struct server_pool *pool, uint8_t *key, uint32_t keylen){ /*hash_tag*/ @@ -620,27 +666,7 @@ msg_fragment_argx(struct context *ctx, struct conn *conn, struct msg *msg, struc } } - if (STAILQ_EMPTY(&msg->mhdr)){ - mbuf = mbuf_get(); - if (mbuf == NULL) { - nc_free(sub_msgs); - return NC_ENOMEM; - } - mbuf_insert(&msg->mhdr, mbuf); - } - - msg_reset_mbufs(msg); - /*rewrite the orig msg to a PING command*/ - mbuf = STAILQ_FIRST(&msg->mhdr); - mbuf->pos = mbuf->last = mbuf->start; - - nc_memcpy(mbuf->pos, "*1\r\n$4\r\nPING\r\n", 14); - mbuf->last = mbuf->start + 14; - msg->key_start = mbuf->start + 8; - msg->key_end = mbuf->start + 12; - msg->mlen = 14; - - conn->recv_done(ctx, conn, msg, nmsg); + msg_make_reply(ctx, conn, msg) ; for(i = 0; i < pool->ncontinuum; i++){ /*prepend mget header, and forward it*/ struct msg* sub_msg = sub_msgs[i]; @@ -738,33 +764,6 @@ msg_append_memcache_key(struct msg *msg, uint8_t * key, uint32_t keylen){ return NC_OK; } -static void -msg_get_reply(struct context *ctx, struct conn *conn, struct msg *smsg) { - struct mbuf *mbuf; - uint32_t n; - struct msg *msg; - - msg = msg_get(conn, true, conn->redis); /*replay*/ - if (msg == NULL) { - conn->err = errno; - return; - } - - mbuf = get_mbuf(msg); - if (mbuf == NULL) { - msg_put(msg); - return; - } - - smsg->peer = msg; - msg->peer = smsg; - msg->request = 0; - - /*smsg->done = 1;*/ - /*event_add_out(ctx->ep, conn);*/ - conn->dequeue_inq(ctx, conn, msg); - conn->enqueue_outq(ctx, conn, msg); -} static rstatus_t msg_fragment_retrieval(struct context *ctx, struct conn *conn, struct msg *msg, struct msg *nmsg){ @@ -804,7 +803,6 @@ msg_fragment_retrieval(struct context *ctx, struct conn *conn, struct msg *msg, uint32_t keylen; uint32_t idx; struct msg *sub_msg; - uint32_t len; msg_fragment_retrieval_update_keypos(msg); key = msg->key_start; @@ -830,19 +828,8 @@ msg_fragment_retrieval(struct context *ctx, struct conn *conn, struct msg *msg, mbuf_insert(&msg->mhdr, mbuf); } - msg_reset_mbufs(msg); - /*rewrite the orig msg to a get xx command TODO*/ - mbuf = STAILQ_FIRST(&msg->mhdr); - mbuf->pos = mbuf->last = mbuf->start; - - nc_memcpy(mbuf->pos, "get xx\r\n", 8); - mbuf->last = mbuf->start + 8; - msg->key_start = mbuf->start + 4; - msg->key_end = mbuf->start + 6; - msg->mlen = 8; - - /*msg_dump(msg, LOG_VERB);*/ - conn->recv_done(ctx, conn, msg, nmsg); + /*conn->recv_done(ctx, conn, msg, nmsg);*/ + msg_make_reply(ctx, conn, msg); for(i = 0; i < pool->ncontinuum; i++){ /*prepend mget header, and forward it*/ struct msg* sub_msg = sub_msgs[i]; diff --git a/src/nc_request.c b/src/nc_request.c index f140f391..9a11a158 100644 --- a/src/nc_request.c +++ b/src/nc_request.c @@ -84,7 +84,7 @@ req_done(struct conn *conn, struct msg *msg) return true; } - if(msg->redis && msg->nfrag_done < msg->nfrag){ + if(msg->nfrag_done < msg->nfrag){ return false; } diff --git a/src/nc_string.h b/src/nc_string.h index 8db44689..16eb5707 100644 --- a/src/nc_string.h +++ b/src/nc_string.h @@ -19,6 +19,7 @@ #define _NC_STRING_H_ #include +#include #include struct string { diff --git a/src/proto/nc_memcache.c b/src/proto/nc_memcache.c index b3e9591d..ec64643f 100644 --- a/src/proto/nc_memcache.c +++ b/src/proto/nc_memcache.c @@ -1233,6 +1233,7 @@ memcache_pre_coalesce(struct msg *r) return; } + pr->frag_owner->nfrag_done ++; switch (r->type) { case MSG_RSP_MC_VALUE: @@ -1375,6 +1376,13 @@ memcache_post_coalesce(struct msg *request) struct msg *sub_msg; uint32_t i; + ASSERT(!response->request); + ASSERT(request->request && request->first_fragment); + if (request->error || request->ferror) { + /* do nothing, if msg is in error */ + return; + } + msg_reset_mbufs(response); for(i = 1; i < request->narg; i++){ /*for each key*/ @@ -1394,7 +1402,7 @@ memcache_post_coalesce(struct msg *request) mbuf = mbuf_get(); if (mbuf == NULL) { nc_free(request->frag_seq); - return NC_ENOMEM; + return; } mbuf->last += nc_snprintf(mbuf->last, mbuf_size(mbuf), "END\r\n"); response->mlen += mbuf_length(mbuf); diff --git a/src/proto/nc_redis.c b/src/proto/nc_redis.c index 61b33755..277621db 100644 --- a/src/proto/nc_redis.c +++ b/src/proto/nc_redis.c @@ -2104,8 +2104,6 @@ redis_pre_coalesce(struct msg *r) mbuf = STAILQ_FIRST(&r->mhdr); r->mlen -= mbuf_length(mbuf); mbuf_rewind(mbuf); - }else{ /*this is the orig mget/mset/msetnx msg, (PONG)*/ - /*do nothing*/ } break; @@ -2191,7 +2189,6 @@ redis_post_coalesce_msetx(struct msg *request) { uint32_t len; mbuf = STAILQ_FIRST(&response->mhdr); - mbuf->last = mbuf->pos = mbuf->start; /*discard PONG*/ len = nc_scnprintf(mbuf->last, mbuf_size(mbuf), "+OK\r\n"); mbuf->last += len; @@ -2207,7 +2204,6 @@ redis_post_coalesce_del(struct msg *request) { uint32_t len; mbuf = STAILQ_FIRST(&response->mhdr); - mbuf->last = mbuf->pos = mbuf->start; /*discard PONG*/ len = nc_scnprintf(mbuf->last, mbuf_size(mbuf), ":%d\r\n", request->integer); mbuf->last += len; @@ -2226,7 +2222,6 @@ redis_post_coalesce_mget(struct msg *request) { uint32_t i; mbuf = STAILQ_FIRST(&response->mhdr); - mbuf->last = mbuf->pos = mbuf->start; /*discard PONG*/ len = nc_scnprintf(mbuf->last, mbuf_size(mbuf), "*%d\r\n", request->narg - 1); mbuf->last += len; @@ -2260,54 +2255,18 @@ redis_post_coalesce(struct msg *r) struct mbuf *mbuf; uint32_t n; + ASSERT(!pr->request); ASSERT(r->request && r->first_fragment); if (r->error || r->ferror) { /* do nothing, if msg is in error */ return; } - ASSERT(!pr->request); - - switch (pr->type) { - case MSG_RSP_REDIS_INTEGER: - /* only redis 'del' fragmented request sends back integer reply */ - ASSERT(r->type == MSG_REQ_REDIS_DEL); - - mbuf = STAILQ_FIRST(&pr->mhdr); - - ASSERT(pr->mlen == 0); - ASSERT(mbuf_empty(mbuf)); - - n = nc_scnprintf(mbuf->last, mbuf_size(mbuf), ":%d\r\n", r->integer); - mbuf->last += n; - pr->mlen += (uint32_t)n; - break; - - case MSG_RSP_REDIS_MULTIBULK: - /* only redis 'mget' fragmented request sends back multi-bulk reply */ - ASSERT(r->type == MSG_REQ_REDIS_MGET); - - mbuf = STAILQ_FIRST(&pr->mhdr); - ASSERT(mbuf_empty(mbuf)); - - n = nc_scnprintf(mbuf->last, mbuf_size(mbuf), "*%d\r\n", r->nfrag); - mbuf->last += n; - pr->mlen += (uint32_t)n; - break; - - case MSG_RSP_REDIS_STATUS: - /*this is the orig mget/mset/msetnx msg, (PONG)*/ - if (r->type == MSG_REQ_REDIS_MGET){ - redis_post_coalesce_mget(r); - }else if (r->type == MSG_REQ_REDIS_DEL){ - redis_post_coalesce_del(r); - }else if (r->type == MSG_REQ_REDIS_MSET || r->type == MSG_REQ_REDIS_MSETNX){ - redis_post_coalesce_msetx(r); - } - - break; - - default: - NOT_REACHED(); + if (r->type == MSG_REQ_REDIS_MGET){ + redis_post_coalesce_mget(r); + }else if (r->type == MSG_REQ_REDIS_DEL){ + redis_post_coalesce_del(r); + }else if (r->type == MSG_REQ_REDIS_MSET || r->type == MSG_REQ_REDIS_MSETNX){ + redis_post_coalesce_msetx(r); } } From 6ae11c5497934944524c3da7570a2c6dfcec9025 Mon Sep 17 00:00:00 2001 From: idning Date: Fri, 9 May 2014 07:26:30 +0800 Subject: [PATCH 11/13] MSETNX should not support MSETNX Return: - 1 if the all the keys were set. - 0 if no key was set (at least one key already existed). --- notes/redis.md | 4 +--- src/nc_message.c | 9 +++------ src/nc_message.h | 3 +-- src/proto/nc_memcache.c | 6 +++--- src/proto/nc_redis.c | 17 ++++++----------- 5 files changed, 14 insertions(+), 25 deletions(-) diff --git a/notes/redis.md b/notes/redis.md index fba30f18..1ecb1e83 100644 --- a/notes/redis.md +++ b/notes/redis.md @@ -81,8 +81,6 @@ +-------------------+------------+---------------------------------------------------------------------------------------------------------------------+ | MSET | Yes* | MSET key value [key value ...] | +-------------------+------------+---------------------------------------------------------------------------------------------------------------------+ - | MSETNX | Yes* | MSETNX key value [key value ...] | - +-------------------+------------+---------------------------------------------------------------------------------------------------------------------+ | PSETEX | Yes | PSETEX key milliseconds value | +-------------------+------------+---------------------------------------------------------------------------------------------------------------------+ | SET | Yes | SET key value [EX seconds] [PX milliseconds] [NX|XX] | @@ -98,7 +96,7 @@ | STRLEN | Yes | STRLEN key | +-------------------+------------+---------------------------------------------------------------------------------------------------------------------+ -* MSET/MSETNX support is not Atomic +* MSET support is not Atomic ### Hashes diff --git a/src/nc_message.c b/src/nc_message.c index b82bdefa..139305a4 100644 --- a/src/nc_message.c +++ b/src/nc_message.c @@ -689,9 +689,6 @@ msg_fragment_argx(struct context *ctx, struct conn *conn, struct msg *msg, struc }else if (msg->type == MSG_REQ_REDIS_MSET){ sub_msg_mbuf->last += nc_snprintf(sub_msg_mbuf->last, mbuf_size(sub_msg_mbuf), "*%d\r\n$4\r\nmset\r\n", sub_msg->narg + 1); - }else if (msg->type == MSG_REQ_REDIS_MSETNX){ - sub_msg_mbuf->last += nc_snprintf(sub_msg_mbuf->last, mbuf_size(sub_msg_mbuf), "*%d\r\n$6\r\nmsetnx\r\n", - sub_msg->narg + 1); } sub_msg->mlen += mbuf_length(sub_msg_mbuf); @@ -729,17 +726,17 @@ msg_fragment_retrieval_update_keypos(struct msg *r){ p = buf->pos; for (; p < buf->last && isspace(*p); p++) { - //eat spaces + /*eat spaces*/ } r->key_start = p; for (; p < buf->last && !isspace(*p); p++) { - //read key + /*read key*/ } r->key_end = p; for (; p < buf->last && isspace(*p); p++) { - //eat spaces + /*eat spaces*/ } buf->pos = p; return NC_OK; diff --git a/src/nc_message.h b/src/nc_message.h index ea862811..dd32a456 100644 --- a/src/nc_message.h +++ b/src/nc_message.h @@ -80,8 +80,7 @@ typedef enum msg_type { MSG_REQ_REDIS_INCRBY, MSG_REQ_REDIS_INCRBYFLOAT, MSG_REQ_REDIS_MGET, - MSG_REQ_REDIS_MSET, /*mset & msetnx*/ - MSG_REQ_REDIS_MSETNX, + MSG_REQ_REDIS_MSET, /*mset*/ MSG_REQ_REDIS_PSETEX, MSG_REQ_REDIS_RESTORE, MSG_REQ_REDIS_SET, diff --git a/src/proto/nc_memcache.c b/src/proto/nc_memcache.c index ec64643f..97869271 100644 --- a/src/proto/nc_memcache.c +++ b/src/proto/nc_memcache.c @@ -730,17 +730,17 @@ memcache_parse_rsp(struct msg *r) SW_RSP_STR, SW_SPACES_BEFORE_KEY, SW_KEY, - SW_SPACES_BEFORE_FLAGS, //5 + SW_SPACES_BEFORE_FLAGS, /*5*/ SW_FLAGS, SW_SPACES_BEFORE_VLEN, SW_VLEN, SW_RUNTO_VAL, - SW_VAL, //10 + SW_VAL, /*10*/ SW_VAL_LF, SW_END, SW_RUNTO_CRLF, SW_CRLF, - SW_ALMOST_DONE, //15 + SW_ALMOST_DONE, /*15*/ SW_SENTINEL } state; diff --git a/src/proto/nc_redis.c b/src/proto/nc_redis.c index 277621db..08e97a44 100644 --- a/src/proto/nc_redis.c +++ b/src/proto/nc_redis.c @@ -239,7 +239,6 @@ redis_arg2x(struct msg *r) { switch (r->type) { case MSG_REQ_REDIS_MSET: - case MSG_REQ_REDIS_MSETNX: return true; default: @@ -655,10 +654,6 @@ redis_parse_req(struct msg *r) break; case 6: - if (str6icmp(m, 'm', 's', 'e', 't', 'n', 'x')) { - r->type = MSG_REQ_REDIS_MSETNX; - break; - } if (str6icmp(m, 'a', 'p', 'p', 'e', 'n', 'd')) { r->type = MSG_REQ_REDIS_APPEND; break; @@ -2100,7 +2095,7 @@ redis_pre_coalesce(struct msg *r) break; case MSG_RSP_REDIS_STATUS: - if (pr->type == MSG_REQ_REDIS_MSET || pr->type == MSG_REQ_REDIS_MSETNX){ /*MSET segments*/ + if (pr->type == MSG_REQ_REDIS_MSET){ /*MSET segments*/ mbuf = STAILQ_FIRST(&r->mhdr); r->mlen -= mbuf_length(mbuf); mbuf_rewind(mbuf); @@ -2183,7 +2178,7 @@ redis_copy_bulk(struct msg *dst, struct msg * src){ } void -redis_post_coalesce_msetx(struct msg *request) { +redis_post_coalesce_mset(struct msg *request) { struct msg *response = request->peer; struct mbuf * mbuf; uint32_t len; @@ -2262,11 +2257,11 @@ redis_post_coalesce(struct msg *r) return; } - if (r->type == MSG_REQ_REDIS_MGET){ + if (r->type == MSG_REQ_REDIS_MGET) { redis_post_coalesce_mget(r); - }else if (r->type == MSG_REQ_REDIS_DEL){ + } else if (r->type == MSG_REQ_REDIS_DEL) { redis_post_coalesce_del(r); - }else if (r->type == MSG_REQ_REDIS_MSET || r->type == MSG_REQ_REDIS_MSETNX){ - redis_post_coalesce_msetx(r); + } else if (r->type == MSG_REQ_REDIS_MSET) { + redis_post_coalesce_mset(r); } } From 1d5d0d26aa7d44c0b738088b178774d4923de7a8 Mon Sep 17 00:00:00 2001 From: idning Date: Sat, 10 May 2014 16:57:15 +0800 Subject: [PATCH 12/13] add travis.sh --- .travis.yml | 2 +- travis.sh | 22 ++++++++++++++++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) create mode 100644 travis.sh diff --git a/.travis.yml b/.travis.yml index b0de971d..47ca5641 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,3 +1,3 @@ language: c -script: CFLAGS="-ggdb3 -O0" autoreconf -fvi && ./configure --enable-debug=log && make && sudo make install +script: sh ./travis.sh diff --git a/travis.sh b/travis.sh new file mode 100644 index 00000000..904eab6b --- /dev/null +++ b/travis.sh @@ -0,0 +1,22 @@ +#!/bin/bash +#file : travis.sh +#author : ning +#date : 2014-05-10 16:54:43 + +DATE=`date +'%Y%m%d%H%M'` +DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" + +pip install nose +pip install redis +git clone https://github.com/idning/python-memcached +cd python-memcached && python setup.py install + + +CFLAGS="-ggdb3 -O0" autoreconf -fvi && ./configure --enable-debug=log && make && sudo make install + +git clone git@github.com:idning/test-twemproxy.git + +cp src/nutcracker test-twemproxy/_binaries/ + +cd test-twemproxy && nosetests -v + From 3818acadd4ad26e9477e0553b04f4be8226773ab Mon Sep 17 00:00:00 2001 From: idning Date: Sat, 10 May 2014 17:03:10 +0800 Subject: [PATCH 13/13] setup travis-ci with test-twemproxy update .travis.yam update travis update travis update travis update travis update travis update travis update travis update travis update travis update travis update travis update travis update travis update travis update travis update travis update travis update travis --- .travis.yml | 2 +- travis.sh | 25 ++++++++++++++----------- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/.travis.yml b/.travis.yml index 47ca5641..254d91a0 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,3 +1,3 @@ language: c -script: sh ./travis.sh +script: bash ./travis.sh diff --git a/travis.sh b/travis.sh index 904eab6b..a4011f78 100644 --- a/travis.sh +++ b/travis.sh @@ -3,20 +3,23 @@ #author : ning #date : 2014-05-10 16:54:43 -DATE=`date +'%Y%m%d%H%M'` -DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" +#install +sudo pip install redis +sudo pip install nose +sudo pip install -e git://github.com/idning/python-memcached.git#egg=memcache +sudo apt-get install socat -pip install nose -pip install redis -git clone https://github.com/idning/python-memcached -cd python-memcached && python setup.py install +#build twemproxy +CFLAGS="-ggdb3 -O0" autoreconf -fvi && ./configure --enable-debug=log && make - -CFLAGS="-ggdb3 -O0" autoreconf -fvi && ./configure --enable-debug=log && make && sudo make install - -git clone git@github.com:idning/test-twemproxy.git +#setup test-twemproxy +git clone https://github.com/idning/test-twemproxy.git cp src/nutcracker test-twemproxy/_binaries/ +cp `which redis-server` test-twemproxy/_binaries/ +cp `which redis-cli` test-twemproxy/_binaries/ +cp `which memcached` test-twemproxy/_binaries/ -cd test-twemproxy && nosetests -v +#run test +cd test-twemproxy/ && nosetests --nocapture --nologcapture -v