Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@
*.o
*.so
*.DS_Store
fRanz/inst/librdkafka*
franz/src/librdkafka
librdkafka*
10 changes: 5 additions & 5 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ services:
- docker

before_install:
- make librdkafka LIBRDKAFKA_CONF=--prefix=../dest
- export LIBRARY_PATH=$LIBRARY_PATH:../dest/lib
- export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:../dest/lib
- docker-compose up -d
- sleep 20
- cd fRanz

language: r
r:
- release
cache: packages

os:
- linux
- osx
cache: packages
18 changes: 12 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
PACKAGE = fRanz
INSTALLDIR = $(HOME)/.$(PACKAGE)/local
OS = $(shell uname -s)

.PHONY: install smoke test docs roxygen pdf version clean distclean cleanRcpp unlock
.PHONY: install test docs clean distclean cleanRcpp unlock
LIBRDKAFKA_CONF=''

install: unlock clean cleanRcpp
# Install fRanz R package
Expand All @@ -22,16 +20,24 @@ clean:
distclean: clean cleanRcpp

unlock:
# Remove 00LOCK-cpproll directory
# Remove 00LOCK directory
for libpath in $$(Rscript -e "noquote(paste(.libPaths(), collapse = ' '))"); do \
echo "Unlocking $$libpath..." && \
rm -rf $$libpath/00LOCK-$(PACKAGE); \
done

docs roxygen:
docs:
# Regenerate documentation with roxygen
Rscript -e "roxygen2::roxygenize('$(PACKAGE)')"
test:
# Run unit tests
Rscript -e "devtools::test('$(PACKAGE)')"

librdkafka:
wget https://github.com/edenhill/librdkafka/archive/v1.0.0.tar.gz -O librdkafka-1.0.0.tar.gz && \
tar xzf librdkafka-1.0.0.tar.gz && \
cd librdkafka-1.0.0 && \
./configure $(LIBRDKAFKA_CONF)&& \
$(MAKE) && \
$(MAKE) install && \
cd .. && rm -rf librdkafka-1.0.0; \
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@
fRanz is an open source R kafka client that allows users to read and write messages from kafka. It leverages the stability and performance of [librdkafka](https://github.com/edenhill/librdkafka) and implements ididiomatic R workflows ontop of it.


## Installation

We're working on it. Currently you need librdkafka as a system available library in order to load the R package. In order to install from source you also need the headers. A make recipe in the top level `make librdkafka` should work for *nix systems.


No attempt has been made for windows compatability.

## Example of sending and reading a message

```r
Expand Down
2 changes: 1 addition & 1 deletion fRanz/DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Package: fRanz
Type: Package
Title: An R Kafka Client
Title: An R Kafka Client.
Version: 0.1.0
Date: 2019-05-13
Authors@R: c(
Expand Down
3 changes: 3 additions & 0 deletions fRanz/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@

export(GetRdConsumer)
export(GetRdProducer)
export(KafkaBroker)
export(KafkaConsume)
export(KafkaConsumer)
export(KafkaProduce)
export(KafkaProducer)
export(RdSubscribe)
importFrom(R6,R6Class)
importFrom(Rcpp,sourceCpp)
Expand Down
3 changes: 2 additions & 1 deletion fRanz/R/KafkaBroker.R
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#' @title KafkaBroker
#' @name Kafka Broker
#' @name KafkaBroker
#' @description TDB
#' @export
#' @importFrom R6 R6Class
KafkaBroker <- R6::R6Class(
classname = "KafkaBroker"
Expand Down
1 change: 1 addition & 0 deletions fRanz/R/KafkaConsumer.R
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#' @title Kakfa Consumer
#' @name KafkaConsumer
#' @description TDB
#' @export
#' @importFrom R6 R6Class
KafkaConsumer <- R6::R6Class(
classname = "KafkaConsumer"
Expand Down
1 change: 1 addition & 0 deletions fRanz/R/KafkaProducer.R
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#' @title Kakfa Producer
#' @name KafkaProducer
#' @description TDB
#' @export
#' @importFrom R6 R6Class
KafkaProducer <- R6::R6Class(
classname = "KafkaProducer"
Expand Down
11 changes: 6 additions & 5 deletions fRanz/R/RcppExports.R
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

#' @title GetRdConsumer
#' @name GetRdConsumer
#' @description Creates an Rcpp::XPtr<RdKafka::Consumer>. For more details on options \link{https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md}
#' @description Creates an Rcpp::XPtr<RdKafka::Consumer>. For more details on options \href{https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md}{Configuration}
#' @param keys a character vector indicating option keys to parameterize the RdKafka::Consumer
#' @param values a character vector indicating option values to parameterize the RdKafka::Consumer. Must be of same length as keys.
#' @return a Rcpp::XPtr<RdKafka::Consumer>
Expand All @@ -26,18 +26,19 @@ RdSubscribe <- function(consumerPtr, Rtopics) {

#' @title KafkaConsume
#' @name KafkaConsume
#' @description
#' @description A method to that takes a consumer pointer and returns at most the specified number of results, unless the timeout is reached
#' @param consumerPtr a reference to a Rcpp::XPtr<RdKafka::KafkaConsumer>
#' @param numResults how many results should be consumed before returning. Will return early if offset is at maximum
#' @param timeout the timeout in milliseconds. Default is 10000
#' @return a list of length numResults with values list(key=key,value=value)
#' @export
KafkaConsume <- function(consumerPtr, numResults) {
.Call('_fRanz_KafkaConsume', PACKAGE = 'fRanz', consumerPtr, numResults)
KafkaConsume <- function(consumerPtr, numResults, timeout = 10000L) {
.Call('_fRanz_KafkaConsume', PACKAGE = 'fRanz', consumerPtr, numResults, timeout)
}

#' @title GetRdProducer
#' @name GetRdProducer
#' @description Creates an Rcpp::XPtr<RdKafka::Producer>. For more details on options \link{https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md}
#' @description Creates an Rcpp::XPtr<RdKafka::Producer>. For more details on options \href{https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md}{Configuration}
#' @param keys a character vector indicating option keys to parameterize the RdKafka::Producer
#' @param values a character vector indicating option values to parameterize the RdKafka::Producer. Must be of same length as keys.
#' @return a Rcpp::XPtr<RdKafka::Producer>
Expand Down
2 changes: 1 addition & 1 deletion fRanz/man/GetRdConsumer.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion fRanz/man/GetRdProducer.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions fRanz/man/Kafka-Broker.Rd → fRanz/man/KafkaBroker.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions fRanz/man/KafkaConsume.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 5 additions & 22 deletions fRanz/src/Makevars
Original file line number Diff line number Diff line change
@@ -1,23 +1,6 @@
INSTALLDIR = $(HOME)/.fRanz/librdkafka
LIBRDKAFKADIR = $(PWD)/../inst/librdkafka-0.11.6

PKG_LIBS = -L$(INSTALLDIR)/src-cpp -lrdkafka++
PKG_CXXFLAGS = -std=c++11 -I$(INSTALLDIR)/src-cpp

.PHONY: all install_librdkadka

all: install_librdkadka

install_librdkadka:
if [ ! -s $(INSTALLDIR)/src-cpp/librdkafka++.a ] ; \
then \
mkdir -p $(INSTALLDIR) && \
cd $(shell dirname $(LIBRDKAFKADIR)) && \
tar xzf $(LIBRDKAFKADIR).tar.gz && \
cd $(LIBRDKAFKADIR) && \
./configure && \
$(MAKE) && \
$(MAKE) install && \
mv * $(INSTALLDIR) ; \
fi
LIBRDKAFKA_LIBS = -L../../dest -lrdkafka++
LIBRDKAFKA_CXXFLAGS = -I../../dest/include -lrdkafka++

CXX_STD=CXX11
PKG_CXXFLAGS = $(LIBRDKAFKA_CXXFLAGS)
PKG_LIBS = $(LIBRDKAFKA_LIBS)
9 changes: 5 additions & 4 deletions fRanz/src/RcppExports.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@ BEGIN_RCPP
END_RCPP
}
// KafkaConsume
Rcpp::List KafkaConsume(SEXP consumerPtr, int numResults);
RcppExport SEXP _fRanz_KafkaConsume(SEXP consumerPtrSEXP, SEXP numResultsSEXP) {
Rcpp::List KafkaConsume(SEXP consumerPtr, int numResults, int timeout);
RcppExport SEXP _fRanz_KafkaConsume(SEXP consumerPtrSEXP, SEXP numResultsSEXP, SEXP timeoutSEXP) {
BEGIN_RCPP
Rcpp::RObject rcpp_result_gen;
Rcpp::RNGScope rcpp_rngScope_gen;
Rcpp::traits::input_parameter< SEXP >::type consumerPtr(consumerPtrSEXP);
Rcpp::traits::input_parameter< int >::type numResults(numResultsSEXP);
rcpp_result_gen = Rcpp::wrap(KafkaConsume(consumerPtr, numResults));
Rcpp::traits::input_parameter< int >::type timeout(timeoutSEXP);
rcpp_result_gen = Rcpp::wrap(KafkaConsume(consumerPtr, numResults, timeout));
return rcpp_result_gen;
END_RCPP
}
Expand Down Expand Up @@ -72,7 +73,7 @@ END_RCPP
static const R_CallMethodDef CallEntries[] = {
{"_fRanz_GetRdConsumer", (DL_FUNC) &_fRanz_GetRdConsumer, 2},
{"_fRanz_RdSubscribe", (DL_FUNC) &_fRanz_RdSubscribe, 2},
{"_fRanz_KafkaConsume", (DL_FUNC) &_fRanz_KafkaConsume, 2},
{"_fRanz_KafkaConsume", (DL_FUNC) &_fRanz_KafkaConsume, 3},
{"_fRanz_GetRdProducer", (DL_FUNC) &_fRanz_GetRdProducer, 2},
{"_fRanz_KafkaProduce", (DL_FUNC) &_fRanz_KafkaProduce, 5},
{NULL, NULL, 0}
Expand Down
19 changes: 6 additions & 13 deletions fRanz/src/consumer.cpp
Original file line number Diff line number Diff line change
@@ -1,19 +1,11 @@
#include <rdkafkacpp.h>
#include <librdkafka/rdkafkacpp.h>
#include <Rcpp.h>
#include "utils.h"
#include <iostream>
#include <string>
#include <cstdlib>
#include <cstdio>
#include <csignal>
#include <thread>
#include <chrono>
#include <cstring>

////////////////////////////////////////////////////////////////////////////////////////
//' @title GetRdConsumer
//' @name GetRdConsumer
//' @description Creates an Rcpp::XPtr<RdKafka::Consumer>. For more details on options \link{https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md}
//' @description Creates an Rcpp::XPtr<RdKafka::Consumer>. For more details on options \href{https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md}{Configuration}
//' @param keys a character vector indicating option keys to parameterize the RdKafka::Consumer
//' @param values a character vector indicating option values to parameterize the RdKafka::Consumer. Must be of same length as keys.
//' @return a Rcpp::XPtr<RdKafka::Consumer>
Expand Down Expand Up @@ -52,18 +44,19 @@ int RdSubscribe(SEXP consumerPtr, const Rcpp::StringVector Rtopics) {

//' @title KafkaConsume
//' @name KafkaConsume
//' @description
//' @description A method to that takes a consumer pointer and returns at most the specified number of results, unless the timeout is reached
//' @param consumerPtr a reference to a Rcpp::XPtr<RdKafka::KafkaConsumer>
//' @param numResults how many results should be consumed before returning. Will return early if offset is at maximum
//' @param timeout the timeout in milliseconds. Default is 10000
//' @return a list of length numResults with values list(key=key,value=value)
//' @export
// [[Rcpp::export]]
Rcpp::List KafkaConsume(SEXP consumerPtr, int numResults) {
Rcpp::List KafkaConsume(SEXP consumerPtr, int numResults, int timeout = 10000) {
Rcpp::XPtr<RdKafka::KafkaConsumer> consumer(consumerPtr);

Rcpp::List messages(numResults);
for(int i = 0; i < numResults; i++) {
RdKafka::Message *msg = consumer->consume(10000);
RdKafka::Message *msg = consumer->consume(timeout);
switch(msg->err()){
case RdKafka::ERR_NO_ERROR: {
Rcpp::List message = Rcpp::List::create(Rcpp::Named("key") = *msg->key(),
Expand Down
10 changes: 2 additions & 8 deletions fRanz/src/producer.cpp
Original file line number Diff line number Diff line change
@@ -1,16 +1,10 @@
#include <rdkafkacpp.h>
#include <librdkafka/rdkafkacpp.h>
#include <Rcpp.h>
#include "utils.h"
#include <iostream>
#include <string>
#include <cstdlib>
#include <cstdio>
#include <csignal>
#include <cstring>

//' @title GetRdProducer
//' @name GetRdProducer
//' @description Creates an Rcpp::XPtr<RdKafka::Producer>. For more details on options \link{https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md}
//' @description Creates an Rcpp::XPtr<RdKafka::Producer>. For more details on options \href{https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md}{Configuration}
//' @param keys a character vector indicating option keys to parameterize the RdKafka::Producer
//' @param values a character vector indicating option values to parameterize the RdKafka::Producer. Must be of same length as keys.
//' @return a Rcpp::XPtr<RdKafka::Producer>
Expand Down
2 changes: 1 addition & 1 deletion fRanz/src/utils.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#include <Rcpp.h>
#include <rdkafkacpp.h>
#include <librdkafka/rdkafkacpp.h>



Expand Down
2 changes: 1 addition & 1 deletion fRanz/src/utils.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#include <Rcpp.h>
#include <rdkafkacpp.h>
#include <librdkafka/rdkafkacpp.h>

RdKafka::Conf* MakeKafkaConfig(Rcpp::StringVector keys, Rcpp::StringVector values);
2 changes: 2 additions & 0 deletions fRanz/tests/testthat/test-KafkaConsumer.R
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ context("KafkaConsumer and KafkaProducer")


testthat::test_that("Testing KafkaConsumer and KafkaProducer work consuming 1 message",{
testthat::skip_on_cran()
## Standard Set Up
topic <- uuid::UUIDgenerate()
group <- uuid::UUIDgenerate()
Expand All @@ -25,6 +26,7 @@ testthat::test_that("Testing KafkaConsumer and KafkaProducer work consuming 1 me


testthat::test_that("Testing KafkaConsumer and KafkaProducer work consuming random number of messages",{
testthat::skip_on_cran()
## Standard Set Up
topic <- uuid::UUIDgenerate()
group <- uuid::UUIDgenerate()
Expand Down