diff --git a/Makefile b/Makefile index 472a6d4..5fc12a6 100644 --- a/Makefile +++ b/Makefile @@ -1,34 +1,36 @@ CC=gcc +CFLAGS=-std=gnu99 -O3 -Wall -Wextra -g +LDFLAGS=-g +LOADLIBS=-lpthread +all : bin/test_p1c1 bin/test_p4c4 bin/test_p100c10 bin/test_p10c100 bin/example + +bin/example: example.c liblfq.so.1.0.0 + gcc $(CFLAGS) $(LDFLAGS) example.c lfq.c -o bin/example -lpthread + +bin/test_p1c1: liblfq.so.1.0.0 test_multithread.c + gcc $(CFLAGS) $(LDFLAGS) test_multithread.c -o bin/test_p1c1 -L. -Wl,-Bstatic -llfq -Wl,-Bdynamic -lpthread -D MAX_PRODUCER=1 -D MAX_CONSUMER=1 + +bin/test_p4c4: liblfq.so.1.0.0 test_multithread.c + gcc $(CFLAGS) $(LDFLAGS) test_multithread.c -o bin/test_p4c4 -L. -Wl,-Bstatic -llfq -Wl,-Bdynamic -lpthread -D MAX_PRODUCER=4 -D MAX_CONSUMER=4 + +bin/test_p100c10: liblfq.so.1.0.0 test_multithread.c + gcc $(CFLAGS) $(LDFLAGS) test_multithread.c -o bin/test_p100c10 -L. -Wl,-Bstatic -llfq -Wl,-Bdynamic -lpthread -D MAX_PRODUCER=100 -D MAX_CONSUMER=10 + +bin/test_p10c100: liblfq.so.1.0.0 test_multithread.c + gcc $(CFLAGS) $(LDFLAGS) test_multithread.c -o bin/test_p10c100 -L. -Wl,-Bstatic -llfq -Wl,-Bdynamic -lpthread -D MAX_PRODUCER=10 -D MAX_CONSUMER=100 + +liblfq.so.1.0.0: lfq.c lfq.h cross-platform.h + gcc $(CFLAGS) $(CPPFLAGS) -c lfq.c # -fno-pie for static linking? + ar rcs liblfq.a lfq.o + gcc $(CFLAGS) $(CPPFLAGS) -fPIC -c lfq.c + gcc $(LDFLAGS) -shared -o liblfq.so.1.0.0 lfq.o + +test: bin/test_p1c1 bin/test_p4c4 bin/test_p100c10 bin/test_p10c100 + $(TESTWRAPPER) bin/test_p1c1 + $(TESTWRAPPER) bin/test_p4c4 + $(TESTWRAPPER) bin/test_p100c10 + $(TESTWRAPPER) bin/test_p10c100 -all : p1c1 p4c4 p100c10 p10c100 example - -example: liblfq - gcc -std=c99 -g example.c lfq.c -o bin/example -lpthread - -p1c1: liblfq - gcc -std=c99 -g test_multithread.c -o bin/test_p1c1 -L. -Wl,-Bstatic -llfq -Wl,-Bdynamic -lpthread -D MAX_PRODUCER=1 -D MAX_CONSUMER=1 - -p4c4: liblfq - gcc -std=c99 -g test_multithread.c -o bin/test_p4c4 -L. -Wl,-Bstatic -llfq -Wl,-Bdynamic -lpthread -D MAX_PRODUCER=4 -D MAX_CONSUMER=4 - -p100c10: liblfq - gcc -std=c99 -g test_multithread.c -o bin/test_p100c10 -L. -Wl,-Bstatic -llfq -Wl,-Bdynamic -lpthread -D MAX_PRODUCER=100 -D MAX_CONSUMER=10 - -p10c100: liblfq - gcc -std=c99 -g test_multithread.c -o bin/test_p10c100 -L. -Wl,-Bstatic -llfq -Wl,-Bdynamic -lpthread -D MAX_PRODUCER=10 -D MAX_CONSUMER=100 - -liblfq: lfq.c - @gcc -std=c99 -c -O3 lfq.c -lpthread - @ar rcs liblfq.a lfq.o - @gcc -std=c99 -fPIC -c lfq.c - @gcc -shared -o liblfq.so.1.0.0 lfq.o - -test: - @bin/test_p1c1 - @bin/test_p4c4 - @bin/test_p100c10 - @bin/test_p10c100 - clean: rm -rf *.o bin/* liblfq.so.1.0.0 liblfq.a diff --git a/cross-platform.h b/cross-platform.h index be5de23..85ca47d 100644 --- a/cross-platform.h +++ b/cross-platform.h @@ -38,15 +38,20 @@ #define ATOMIC_SET __sync_lock_test_and_set #define ATOMIC_RELEASE __sync_lock_release -#if defined __GNUC__ || defined __CYGWIN__ || defined __MINGW32__ +#if defined __GNUC__ #define ATOMIC_SUB __sync_sub_and_fetch #define ATOMIC_SUB64 ATOMIC_SUB #define CAS __sync_bool_compare_and_swap +#define XCHG __sync_lock_test_and_set // yes really. The 2nd arg is limited to 1 on machines with TAS but not XCHG. On x86 it's an arbitrary value #define ATOMIC_ADD __sync_add_and_fetch #define ATOMIC_ADD64 ATOMIC_ADD #define mb __sync_synchronize - #define lmb() asm volatile( "lfence" ) - #define smb() asm volatile( "sfence" ) +#if defined(__x86_64__) || defined(__i386) +// #define lmb() asm volatile( "lfence" ) +// #define smb() asm volatile( "sfence" ) + #define lmb() asm volatile("":::"memory") // compiler barrier only. runtime reordering already impossible on x86 + #define smb() asm volatile("":::"memory") +#endif // else no definition // thread #include diff --git a/lfq.c b/lfq.c index 8ce9dc9..243577b 100755 --- a/lfq.c +++ b/lfq.c @@ -1,8 +1,10 @@ #include "cross-platform.h" #include "lfq.h" +#include #include #define MAXFREE 150 +static int inHP(struct lfq_ctx *ctx, struct lfq_node * lfn) { for ( int i = 0 ; i < ctx->MAXHPSIZE ; i++ ) { lmb(); @@ -12,6 +14,7 @@ int inHP(struct lfq_ctx *ctx, struct lfq_node * lfn) { return 0; } +static void enpool(struct lfq_ctx *ctx, struct lfq_node * lfn) { volatile struct lfq_node * p; do { @@ -20,6 +23,7 @@ void enpool(struct lfq_ctx *ctx, struct lfq_node * lfn) { p->free_next = lfn; } +static void free_pool(struct lfq_ctx *ctx, bool freeall ) { if (!CAS(&ctx->is_freeing, 0, 1)) return; // this pool free is not support multithreading. @@ -37,20 +41,23 @@ void free_pool(struct lfq_ctx *ctx, bool freeall ) { smb(); } +static void safe_free(struct lfq_ctx *ctx, struct lfq_node * lfn) { if (lfn->can_free && !inHP(ctx,lfn)) { - // free is not thread safety + // free is not thread-safe if (CAS(&ctx->is_freeing, 0, 1)) { - free(lfn); + lfn->next = (void*)-1; // poison the pointer to detect use-after-free + free(lfn); // we got the lock; actually free ctx->is_freeing = false; smb(); - } else + } else // we didn't get the lock; only add to a freelist enpool(ctx, lfn); } else enpool(ctx, lfn); free_pool(ctx, false); } +static int alloc_tid(struct lfq_ctx *ctx) { for (int i = 0; i < ctx->MAXHPSIZE; i++) if (ctx->tid_map[i] == 0) @@ -60,6 +67,7 @@ int alloc_tid(struct lfq_ctx *ctx) { return -1; } +static void free_tid(struct lfq_ctx *ctx, int tid) { ctx->tid_map[tid]=0; } @@ -84,6 +92,24 @@ int lfq_init(struct lfq_ctx *ctx, int max_consume_thread) { return 0; } + +long lfg_count_freelist(const struct lfq_ctx *ctx) { + long count=0; + struct lfq_node *p = (struct lfq_node *)ctx->fph; // non-volatile + while(p) { + count++; + p = p->free_next; + } +/* + while(pn = p->free_next) { + free(p); + p = pn; + count++; + } +*/ + return count; +} + int lfq_clean(struct lfq_ctx *ctx){ if ( ctx->tail && ctx->head ) { // if have data in queue struct lfq_node *tmp; @@ -141,6 +167,7 @@ void * lfq_dequeue_tid(struct lfq_ctx *ctx, int tid ) { ctx->HP[tid] = 0; return 0; } + assert(pn != (void*)-1 && "read an already-freed node"); } while( ! CAS(&ctx->head, p, pn) ); mb(); ctx->HP[tid] = 0; diff --git a/lfq.h b/lfq.h index 8d3fe68..b2d4198 100755 --- a/lfq.h +++ b/lfq.h @@ -2,6 +2,8 @@ #define __LFQ_H__ #include "cross-platform.h" +#include // C11 + struct lfq_node{ void * data; struct lfq_node * volatile next; @@ -10,8 +12,7 @@ struct lfq_node{ }; struct lfq_ctx{ - volatile struct lfq_node * volatile head; - volatile struct lfq_node * volatile tail; + alignas(64) volatile struct lfq_node * volatile head; int volatile count; volatile struct lfq_node * * HP; volatile int * tid_map; @@ -19,10 +20,14 @@ struct lfq_ctx{ volatile struct lfq_node * volatile fph; // free pool head volatile struct lfq_node * volatile fpt; // free pool tail int MAXHPSIZE; + + alignas(64) volatile struct lfq_node * volatile tail; // in another cache line to avoid contention }; int lfq_init(struct lfq_ctx *ctx, int max_consume_thread); int lfq_clean(struct lfq_ctx *ctx); +long lfg_count_freelist(const struct lfq_ctx *ctx); + int lfq_enqueue(struct lfq_ctx *ctx, void * data); void * lfq_dequeue_tid(struct lfq_ctx *ctx, int tid ); void * lfq_dequeue(struct lfq_ctx *ctx ); diff --git a/test_multithread.c b/test_multithread.c index 68537b8..7c239b8 100644 --- a/test_multithread.c +++ b/test_multithread.c @@ -38,19 +38,19 @@ struct user_data{ THREAD_FN addq( void * data ) { struct lfq_ctx * ctx = data; - struct user_data * p=(struct user_data *)0xff; - long i; int ret = 0; - for ( i = 0 ; i < 500000 ; i++) { - p = malloc(sizeof(struct user_data)); + long added; + for (added = 0 ; added < 500000 ; added++) { + struct user_data * p = malloc(sizeof(struct user_data)); p->data=SOMEID; if ( ( ret = lfq_enqueue(ctx,p) ) != 0 ) { printf("lfq_enqueue failed, reason:%s\n", strerror(-ret)); + ATOMIC_ADD64(&cn_added, added); ATOMIC_SUB(&cn_producer, 1); return 0; } - ATOMIC_ADD64(&cn_added, 1); } + ATOMIC_ADD64(&cn_added, added); ATOMIC_SUB(&cn_producer, 1); printf("Producer thread [%lu] exited! Still %d running...\n",THREAD_ID(), cn_producer); return 0; @@ -60,21 +60,28 @@ THREAD_FN delq(void * data) { struct lfq_ctx * ctx = data; struct user_data * p; int tid = ATOMIC_ADD(&cn_t, 1); - - while(ctx->count || cn_producer) { + + long deleted = 0; + while(1) { p = lfq_dequeue_tid(ctx, tid); if (p) { if (p->data!=SOMEID){ printf("data wrong!!\n"); exit(1); } - + free(p); - ATOMIC_ADD64(&cn_deled, 1); - } else - THREAD_YIELD(); // queue is empty, release CPU slice + deleted++; + } else { + if (ctx->count || cn_producer) + THREAD_YIELD(); // queue is empty, release CPU slice + else + break; // queue is empty and no more producers + } } + ATOMIC_ADD64(&cn_deled, deleted); + // p = lfq_dequeue_tid(ctx, tid); printf("Consumer thread [%lu] exited %d\n",THREAD_ID(),cn_producer); return 0; @@ -115,12 +122,15 @@ int main() { for ( i = 0 ; i < MAX_CONSUMER ; i++ ) THREAD_WAIT(thread_d[i]); - - printf("Total push %"PRId64" elements, pop %"PRId64" elements.\n", cn_added, cn_deled ); + + long freecount = lfg_count_freelist(&ctx); + int clean = lfq_clean(&ctx); + + printf("Total push %"PRId64" elements, pop %"PRId64" elements. freelist=%ld, clean = %d\n", cn_added, cn_deled, freecount, clean); if ( cn_added == cn_deled ) printf("Test PASS!!\n"); else printf("Test Failed!!\n"); - lfq_clean(&ctx); + return (cn_added != cn_deled); }