Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
216 changes: 216 additions & 0 deletions engines/libpmemstream.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
/*
* libpmemstream: IO engine that uses pmemstream library to read and write data
*
* Copyright 2022, Intel Corporation
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License,
* version 2 as published by the Free Software Foundation..
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
*/

/*
* libpmemstream engine
*
* <description>
*
* general idea is to just read/write using pmemstream; each thread/process would get 2(?) regions
* if an append couldn't be done, we could allocate new region; or switch to a second one for that thread/process;
* and in the background thread (if that's possible with FIO) re-allocate the first region.
*
* To use:
* ioengine=libpmemstream
*
* Other relevant settings:
* ...
*
* See examples/libpmemstream.fio for complete usage example.
*/

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <libpmemstream.h>

#include "../fio.h"
#include "../verify.h"

struct fio_libpmemstream_data {
/* ptr to stream
* ptrs to entry_iterator's
* ptr to region_iterator (?)
*
* and other "global" data
*/

/* we could think of some concurrent data structure to hold info about regions per threads */
};

static int fio_libpmemstream_init(struct thread_data *td)
{
struct thread_options *o = &td->o;

dprint(FD_IO, "o->rw_min_bs %llu\n o->fsync_blocks %u\n o->fdatasync_blocks %u\n",
o->rw_min_bs, o->fsync_blocks, o->fdatasync_blocks);
dprint(FD_IO, "DEBUG fio_libpmemstream_init\n");

/* prep a pmemstream file */

/* read how many threads/processes and initialize multiple regions:
* e.g. 2 regions for each thread/process;
*/
/* io_size == pmemstream size */
/* size / (thread count * 2) == region size */
/* rw_min_bs == stream's block_size */

/* check for a minimum stream size depending on thread count */

return 0;
}

/*
* This is the pmem_map_file execution function, a helper to
* fio_libpmemstream_open_file function.
*/
static int fio_libpmemstream_file(struct thread_data *td, struct fio_file *f /* ... ? */)
{
struct fio_libpmemstream_data *fdd = FILE_ENG_DATA(f);

dprint(FD_IO, "DEBUG fio_libpmemstream_file\n");

/* unmap any existing mapping */
/* make a new pmem2 mapping */

return 0;

err:
/* cleanup on error */

return td->error;
}

static int fio_libpmemstream_open_file(struct thread_data *td, struct fio_file *f)
{
struct fio_libpmemstream_data *fdd;

dprint(FD_IO, "DEBUG fio_libpmemstream_open_file\n");

if (fio_file_open(f))
td_io_close_file(td, f);

fdd = calloc(1, sizeof(*fdd));
if (!fdd) {
return 1;
}
FILE_SET_ENG_DATA(f, fdd);

return fio_libpmemstream_file(td, f /* ... */);
}

static int fio_libpmemstream_prep(struct thread_data *td, struct io_u *io_u)
{
struct fio_file *f = io_u->file;
struct fio_libpmemstream_data *fdd = FILE_ENG_DATA(f);

dprint(FD_IO, "DEBUG fio_libpmemstream_prep\n");

/* block random IO - only seq. workflow makes sense */

return 0;
}

static enum fio_q_status fio_libpmemstream_queue(struct thread_data *td,
struct io_u *io_u)
{
unsigned flags = 0;

fio_ro_check(td, io_u);
io_u->error = 0;

dprint(FD_IO, "DEBUG fio_libpmemstream_queue\n");
dprint(FD_IO, "td->o.odirect %d td->o.sync_io %d\n",
td->o.odirect, td->o.sync_io);
/* map both O_SYNC / DSYNC to not use NODRAIN */
// flags = td->o.sync_io ? 0 : PMEM_F_MEM_NODRAIN;
// flags |= td->o.odirect ? PMEM_F_MEM_NONTEMPORAL : PMEM_F_MEM_TEMPORAL;

switch (io_u->ddir) {
case DDIR_READ:
/* move entry iterator (of the current region) and get the entry */
break;

case DDIR_WRITE:
dprint(FD_IO, "DEBUG ...");

/* append */
/* async_append should be handled differently...? */

/* if there's no more space in the region (for current thread)
* we should switch to the next region (for current thread),
* remove the previous thread and allocate a new one in replace.
*/

case DDIR_SYNC:
case DDIR_DATASYNC:
case DDIR_SYNC_FILE_RANGE:

/* wait commit/persist */

break;
default:
io_u->error = EINVAL;
break;
}

return FIO_Q_COMPLETED;
}

static int fio_libpmemstream_close_file(struct thread_data *td, struct fio_file *f)
{
struct fio_libpmemstream_data *fdd = FILE_ENG_DATA(f);
int ret = 0;

dprint(FD_IO, "DEBUG fio_libpmemstream_close_file\n");

// if (fdd->libpmemstream_ptr)
// ret = /* cleanup libpmemstream */

if (fio_file_open(f))
ret &= generic_close_file(td, f);

FILE_SET_ENG_DATA(f, NULL);
free(fdd);

return ret;
}

FIO_STATIC struct ioengine_ops ioengine = {
.name = "libpmemstream",
.version = FIO_IOOPS_VERSION,
.init = fio_libpmemstream_init,
.prep = fio_libpmemstream_prep,
.queue = fio_libpmemstream_queue,
.open_file = fio_libpmemstream_open_file,
.close_file = fio_libpmemstream_close_file,
.get_file_size = generic_get_file_size,
.prepopulate_file = generic_prepopulate_file,
.flags = FIO_SYNCIO | FIO_RAWIO | FIO_DISKLESSIO | FIO_NOEXTEND |
FIO_NODISKUTIL | FIO_BARRIER | FIO_MEMALIGN,
/* XXX pick proper flags! */
};

static void fio_init fio_libpmemstream_register(void)
{
register_ioengine(&ioengine);
}

static void fio_exit fio_libpmemstream_unregister(void)
{
unregister_ioengine(&ioengine);
}