Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 31 additions & 29 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,34 +1,36 @@
CC=gcc
CFLAGS=-std=gnu99 -O3 -Wall -Wextra -g
LDFLAGS=-g
LOADLIBS=-lpthread

all : bin/test_p1c1 bin/test_p4c4 bin/test_p100c10 bin/test_p10c100 bin/example

bin/example: example.c liblfq.so.1.0.0
gcc $(CFLAGS) $(LDFLAGS) example.c lfq.c -o bin/example -lpthread

bin/test_p1c1: liblfq.so.1.0.0 test_multithread.c
gcc $(CFLAGS) $(LDFLAGS) test_multithread.c -o bin/test_p1c1 -L. -Wl,-Bstatic -llfq -Wl,-Bdynamic -lpthread -D MAX_PRODUCER=1 -D MAX_CONSUMER=1

bin/test_p4c4: liblfq.so.1.0.0 test_multithread.c
gcc $(CFLAGS) $(LDFLAGS) test_multithread.c -o bin/test_p4c4 -L. -Wl,-Bstatic -llfq -Wl,-Bdynamic -lpthread -D MAX_PRODUCER=4 -D MAX_CONSUMER=4

bin/test_p100c10: liblfq.so.1.0.0 test_multithread.c
gcc $(CFLAGS) $(LDFLAGS) test_multithread.c -o bin/test_p100c10 -L. -Wl,-Bstatic -llfq -Wl,-Bdynamic -lpthread -D MAX_PRODUCER=100 -D MAX_CONSUMER=10

bin/test_p10c100: liblfq.so.1.0.0 test_multithread.c
gcc $(CFLAGS) $(LDFLAGS) test_multithread.c -o bin/test_p10c100 -L. -Wl,-Bstatic -llfq -Wl,-Bdynamic -lpthread -D MAX_PRODUCER=10 -D MAX_CONSUMER=100

liblfq.so.1.0.0: lfq.c lfq.h cross-platform.h
gcc $(CFLAGS) $(CPPFLAGS) -c lfq.c # -fno-pie for static linking?
ar rcs liblfq.a lfq.o
gcc $(CFLAGS) $(CPPFLAGS) -fPIC -c lfq.c
gcc $(LDFLAGS) -shared -o liblfq.so.1.0.0 lfq.o

test: bin/test_p1c1 bin/test_p4c4 bin/test_p100c10 bin/test_p10c100
$(TESTWRAPPER) bin/test_p1c1
$(TESTWRAPPER) bin/test_p4c4
$(TESTWRAPPER) bin/test_p100c10
$(TESTWRAPPER) bin/test_p10c100

all : p1c1 p4c4 p100c10 p10c100 example

example: liblfq
gcc -std=c99 -g example.c lfq.c -o bin/example -lpthread

p1c1: liblfq
gcc -std=c99 -g test_multithread.c -o bin/test_p1c1 -L. -Wl,-Bstatic -llfq -Wl,-Bdynamic -lpthread -D MAX_PRODUCER=1 -D MAX_CONSUMER=1

p4c4: liblfq
gcc -std=c99 -g test_multithread.c -o bin/test_p4c4 -L. -Wl,-Bstatic -llfq -Wl,-Bdynamic -lpthread -D MAX_PRODUCER=4 -D MAX_CONSUMER=4

p100c10: liblfq
gcc -std=c99 -g test_multithread.c -o bin/test_p100c10 -L. -Wl,-Bstatic -llfq -Wl,-Bdynamic -lpthread -D MAX_PRODUCER=100 -D MAX_CONSUMER=10

p10c100: liblfq
gcc -std=c99 -g test_multithread.c -o bin/test_p10c100 -L. -Wl,-Bstatic -llfq -Wl,-Bdynamic -lpthread -D MAX_PRODUCER=10 -D MAX_CONSUMER=100

liblfq: lfq.c
@gcc -std=c99 -c -O3 lfq.c -lpthread
@ar rcs liblfq.a lfq.o
@gcc -std=c99 -fPIC -c lfq.c
@gcc -shared -o liblfq.so.1.0.0 lfq.o

test:
@bin/test_p1c1
@bin/test_p4c4
@bin/test_p100c10
@bin/test_p10c100

clean:
rm -rf *.o bin/* liblfq.so.1.0.0 liblfq.a
11 changes: 8 additions & 3 deletions cross-platform.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,20 @@
#define ATOMIC_SET __sync_lock_test_and_set
#define ATOMIC_RELEASE __sync_lock_release

#if defined __GNUC__ || defined __CYGWIN__ || defined __MINGW32__
#if defined __GNUC__
#define ATOMIC_SUB __sync_sub_and_fetch
#define ATOMIC_SUB64 ATOMIC_SUB
#define CAS __sync_bool_compare_and_swap
#define XCHG __sync_lock_test_and_set // yes really. The 2nd arg is limited to 1 on machines with TAS but not XCHG. On x86 it's an arbitrary value
#define ATOMIC_ADD __sync_add_and_fetch
#define ATOMIC_ADD64 ATOMIC_ADD
#define mb __sync_synchronize
#define lmb() asm volatile( "lfence" )
#define smb() asm volatile( "sfence" )
#if defined(__x86_64__) || defined(__i386)
// #define lmb() asm volatile( "lfence" )
// #define smb() asm volatile( "sfence" )
#define lmb() asm volatile("":::"memory") // compiler barrier only. runtime reordering already impossible on x86
#define smb() asm volatile("":::"memory")
#endif // else no definition

// thread
#include <pthread.h>
Expand Down
33 changes: 30 additions & 3 deletions lfq.c
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
#include "cross-platform.h"
#include "lfq.h"
#include <assert.h>
#include <errno.h>
#define MAXFREE 150

static
int inHP(struct lfq_ctx *ctx, struct lfq_node * lfn) {
for ( int i = 0 ; i < ctx->MAXHPSIZE ; i++ ) {
lmb();
Expand All @@ -12,6 +14,7 @@ int inHP(struct lfq_ctx *ctx, struct lfq_node * lfn) {
return 0;
}

static
void enpool(struct lfq_ctx *ctx, struct lfq_node * lfn) {
volatile struct lfq_node * p;
do {
Expand All @@ -20,6 +23,7 @@ void enpool(struct lfq_ctx *ctx, struct lfq_node * lfn) {
p->free_next = lfn;
}

static
void free_pool(struct lfq_ctx *ctx, bool freeall ) {
if (!CAS(&ctx->is_freeing, 0, 1))
return; // this pool free is not support multithreading.
Expand All @@ -37,20 +41,23 @@ void free_pool(struct lfq_ctx *ctx, bool freeall ) {
smb();
}

static
void safe_free(struct lfq_ctx *ctx, struct lfq_node * lfn) {
if (lfn->can_free && !inHP(ctx,lfn)) {
// free is not thread safety
// free is not thread-safe
if (CAS(&ctx->is_freeing, 0, 1)) {
free(lfn);
lfn->next = (void*)-1; // poison the pointer to detect use-after-free
free(lfn); // we got the lock; actually free
ctx->is_freeing = false;
smb();
} else
} else // we didn't get the lock; only add to a freelist
enpool(ctx, lfn);
} else
enpool(ctx, lfn);
free_pool(ctx, false);
}

static
int alloc_tid(struct lfq_ctx *ctx) {
for (int i = 0; i < ctx->MAXHPSIZE; i++)
if (ctx->tid_map[i] == 0)
Expand All @@ -60,6 +67,7 @@ int alloc_tid(struct lfq_ctx *ctx) {
return -1;
}

static
void free_tid(struct lfq_ctx *ctx, int tid) {
ctx->tid_map[tid]=0;
}
Expand All @@ -84,6 +92,24 @@ int lfq_init(struct lfq_ctx *ctx, int max_consume_thread) {
return 0;
}


long lfg_count_freelist(const struct lfq_ctx *ctx) {
long count=0;
struct lfq_node *p = (struct lfq_node *)ctx->fph; // non-volatile
while(p) {
count++;
p = p->free_next;
}
/*
while(pn = p->free_next) {
free(p);
p = pn;
count++;
}
*/
return count;
}

int lfq_clean(struct lfq_ctx *ctx){
if ( ctx->tail && ctx->head ) { // if have data in queue
struct lfq_node *tmp;
Expand Down Expand Up @@ -141,6 +167,7 @@ void * lfq_dequeue_tid(struct lfq_ctx *ctx, int tid ) {
ctx->HP[tid] = 0;
return 0;
}
assert(pn != (void*)-1 && "read an already-freed node");
} while( ! CAS(&ctx->head, p, pn) );
mb();
ctx->HP[tid] = 0;
Expand Down
9 changes: 7 additions & 2 deletions lfq.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
#define __LFQ_H__
#include "cross-platform.h"

#include <stdalign.h> // C11

struct lfq_node{
void * data;
struct lfq_node * volatile next;
Expand All @@ -10,19 +12,22 @@ struct lfq_node{
};

struct lfq_ctx{
volatile struct lfq_node * volatile head;
volatile struct lfq_node * volatile tail;
alignas(64) volatile struct lfq_node * volatile head;
int volatile count;
volatile struct lfq_node * * HP;
volatile int * tid_map;
int volatile is_freeing;
volatile struct lfq_node * volatile fph; // free pool head
volatile struct lfq_node * volatile fpt; // free pool tail
int MAXHPSIZE;

alignas(64) volatile struct lfq_node * volatile tail; // in another cache line to avoid contention
};

int lfq_init(struct lfq_ctx *ctx, int max_consume_thread);
int lfq_clean(struct lfq_ctx *ctx);
long lfg_count_freelist(const struct lfq_ctx *ctx);

int lfq_enqueue(struct lfq_ctx *ctx, void * data);
void * lfq_dequeue_tid(struct lfq_ctx *ctx, int tid );
void * lfq_dequeue(struct lfq_ctx *ctx );
Expand Down
38 changes: 24 additions & 14 deletions test_multithread.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,19 @@ struct user_data{

THREAD_FN addq( void * data ) {
struct lfq_ctx * ctx = data;
struct user_data * p=(struct user_data *)0xff;
long i;
int ret = 0;
for ( i = 0 ; i < 500000 ; i++) {
p = malloc(sizeof(struct user_data));
long added;
for (added = 0 ; added < 500000 ; added++) {
struct user_data * p = malloc(sizeof(struct user_data));
p->data=SOMEID;
if ( ( ret = lfq_enqueue(ctx,p) ) != 0 ) {
printf("lfq_enqueue failed, reason:%s\n", strerror(-ret));
ATOMIC_ADD64(&cn_added, added);
ATOMIC_SUB(&cn_producer, 1);
return 0;
}
ATOMIC_ADD64(&cn_added, 1);
}
ATOMIC_ADD64(&cn_added, added);
ATOMIC_SUB(&cn_producer, 1);
printf("Producer thread [%lu] exited! Still %d running...\n",THREAD_ID(), cn_producer);
return 0;
Expand All @@ -60,21 +60,28 @@ THREAD_FN delq(void * data) {
struct lfq_ctx * ctx = data;
struct user_data * p;
int tid = ATOMIC_ADD(&cn_t, 1);

while(ctx->count || cn_producer) {

long deleted = 0;
while(1) {
p = lfq_dequeue_tid(ctx, tid);
if (p) {
if (p->data!=SOMEID){
printf("data wrong!!\n");
exit(1);
}

free(p);
ATOMIC_ADD64(&cn_deled, 1);
} else
THREAD_YIELD(); // queue is empty, release CPU slice
deleted++;
} else {
if (ctx->count || cn_producer)
THREAD_YIELD(); // queue is empty, release CPU slice
else
break; // queue is empty and no more producers
}
}

ATOMIC_ADD64(&cn_deled, deleted);

// p = lfq_dequeue_tid(ctx, tid);
printf("Consumer thread [%lu] exited %d\n",THREAD_ID(),cn_producer);
return 0;
Expand Down Expand Up @@ -115,12 +122,15 @@ int main() {

for ( i = 0 ; i < MAX_CONSUMER ; i++ )
THREAD_WAIT(thread_d[i]);

printf("Total push %"PRId64" elements, pop %"PRId64" elements.\n", cn_added, cn_deled );

long freecount = lfg_count_freelist(&ctx);
int clean = lfq_clean(&ctx);

printf("Total push %"PRId64" elements, pop %"PRId64" elements. freelist=%ld, clean = %d\n", cn_added, cn_deled, freecount, clean);
if ( cn_added == cn_deled )
printf("Test PASS!!\n");
else
printf("Test Failed!!\n");
lfq_clean(&ctx);

return (cn_added != cn_deled);
}