diff --git a/.gitignore b/.gitignore index 05343b2..d1ca494 100644 --- a/.gitignore +++ b/.gitignore @@ -5,4 +5,5 @@ *.o *.so *.DS_Store -fRanz/inst/librdkafka* +franz/src/librdkafka +librdkafka* \ No newline at end of file diff --git a/.travis.yml b/.travis.yml index 34c4509..59f85f7 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,14 +2,14 @@ services: - docker before_install: + - make librdkafka LIBRDKAFKA_CONF=--prefix=../dest + - export LIBRARY_PATH=$LIBRARY_PATH:../dest/lib + - export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:../dest/lib - docker-compose up -d - sleep 20 + - cd fRanz language: r r: - release -cache: packages - -os: - - linux - - osx +cache: packages \ No newline at end of file diff --git a/Makefile b/Makefile index a778cb6..a082350 100644 --- a/Makefile +++ b/Makefile @@ -1,8 +1,6 @@ PACKAGE = fRanz -INSTALLDIR = $(HOME)/.$(PACKAGE)/local -OS = $(shell uname -s) - -.PHONY: install smoke test docs roxygen pdf version clean distclean cleanRcpp unlock +.PHONY: install test docs clean distclean cleanRcpp unlock +LIBRDKAFKA_CONF='' install: unlock clean cleanRcpp # Install fRanz R package @@ -22,16 +20,24 @@ clean: distclean: clean cleanRcpp unlock: - # Remove 00LOCK-cpproll directory + # Remove 00LOCK directory for libpath in $$(Rscript -e "noquote(paste(.libPaths(), collapse = ' '))"); do \ echo "Unlocking $$libpath..." && \ rm -rf $$libpath/00LOCK-$(PACKAGE); \ done -docs roxygen: +docs: # Regenerate documentation with roxygen Rscript -e "roxygen2::roxygenize('$(PACKAGE)')" test: # Run unit tests Rscript -e "devtools::test('$(PACKAGE)')" +librdkafka: + wget https://github.com/edenhill/librdkafka/archive/v1.0.0.tar.gz -O librdkafka-1.0.0.tar.gz && \ + tar xzf librdkafka-1.0.0.tar.gz && \ + cd librdkafka-1.0.0 && \ + ./configure $(LIBRDKAFKA_CONF)&& \ + $(MAKE) && \ + $(MAKE) install && \ + cd .. && rm -rf librdkafka-1.0.0; \ \ No newline at end of file diff --git a/README.md b/README.md index 51e71e0..7908143 100644 --- a/README.md +++ b/README.md @@ -11,6 +11,13 @@ fRanz is an open source R kafka client that allows users to read and write messages from kafka. It leverages the stability and performance of [librdkafka](https://github.com/edenhill/librdkafka) and implements ididiomatic R workflows ontop of it. +## Installation + +We're working on it. Currently you need librdkafka as a system available library in order to load the R package. In order to install from source you also need the headers. A make recipe in the top level `make librdkafka` should work for *nix systems. + + +No attempt has been made for windows compatability. + ## Example of sending and reading a message ```r diff --git a/fRanz/DESCRIPTION b/fRanz/DESCRIPTION index bb0c8d4..0cd4524 100644 --- a/fRanz/DESCRIPTION +++ b/fRanz/DESCRIPTION @@ -1,6 +1,6 @@ Package: fRanz Type: Package -Title: An R Kafka Client +Title: An R Kafka Client. Version: 0.1.0 Date: 2019-05-13 Authors@R: c( diff --git a/fRanz/NAMESPACE b/fRanz/NAMESPACE index 00f777e..e04d251 100644 --- a/fRanz/NAMESPACE +++ b/fRanz/NAMESPACE @@ -2,8 +2,11 @@ export(GetRdConsumer) export(GetRdProducer) +export(KafkaBroker) export(KafkaConsume) +export(KafkaConsumer) export(KafkaProduce) +export(KafkaProducer) export(RdSubscribe) importFrom(R6,R6Class) importFrom(Rcpp,sourceCpp) diff --git a/fRanz/R/KafkaBroker.R b/fRanz/R/KafkaBroker.R index cbe5500..52991a7 100644 --- a/fRanz/R/KafkaBroker.R +++ b/fRanz/R/KafkaBroker.R @@ -1,6 +1,7 @@ #' @title KafkaBroker -#' @name Kafka Broker +#' @name KafkaBroker #' @description TDB +#' @export #' @importFrom R6 R6Class KafkaBroker <- R6::R6Class( classname = "KafkaBroker" diff --git a/fRanz/R/KafkaConsumer.R b/fRanz/R/KafkaConsumer.R index 532f331..41885b3 100644 --- a/fRanz/R/KafkaConsumer.R +++ b/fRanz/R/KafkaConsumer.R @@ -1,6 +1,7 @@ #' @title Kakfa Consumer #' @name KafkaConsumer #' @description TDB +#' @export #' @importFrom R6 R6Class KafkaConsumer <- R6::R6Class( classname = "KafkaConsumer" diff --git a/fRanz/R/KafkaProducer.R b/fRanz/R/KafkaProducer.R index ed8fcd2..dcf11d4 100644 --- a/fRanz/R/KafkaProducer.R +++ b/fRanz/R/KafkaProducer.R @@ -1,6 +1,7 @@ #' @title Kakfa Producer #' @name KafkaProducer #' @description TDB +#' @export #' @importFrom R6 R6Class KafkaProducer <- R6::R6Class( classname = "KafkaProducer" diff --git a/fRanz/R/RcppExports.R b/fRanz/R/RcppExports.R index 9b503d7..d179f96 100644 --- a/fRanz/R/RcppExports.R +++ b/fRanz/R/RcppExports.R @@ -3,7 +3,7 @@ #' @title GetRdConsumer #' @name GetRdConsumer -#' @description Creates an Rcpp::XPtr. For more details on options \link{https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md} +#' @description Creates an Rcpp::XPtr. For more details on options \href{https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md}{Configuration} #' @param keys a character vector indicating option keys to parameterize the RdKafka::Consumer #' @param values a character vector indicating option values to parameterize the RdKafka::Consumer. Must be of same length as keys. #' @return a Rcpp::XPtr @@ -26,18 +26,19 @@ RdSubscribe <- function(consumerPtr, Rtopics) { #' @title KafkaConsume #' @name KafkaConsume -#' @description +#' @description A method to that takes a consumer pointer and returns at most the specified number of results, unless the timeout is reached #' @param consumerPtr a reference to a Rcpp::XPtr #' @param numResults how many results should be consumed before returning. Will return early if offset is at maximum +#' @param timeout the timeout in milliseconds. Default is 10000 #' @return a list of length numResults with values list(key=key,value=value) #' @export -KafkaConsume <- function(consumerPtr, numResults) { - .Call('_fRanz_KafkaConsume', PACKAGE = 'fRanz', consumerPtr, numResults) +KafkaConsume <- function(consumerPtr, numResults, timeout = 10000L) { + .Call('_fRanz_KafkaConsume', PACKAGE = 'fRanz', consumerPtr, numResults, timeout) } #' @title GetRdProducer #' @name GetRdProducer -#' @description Creates an Rcpp::XPtr. For more details on options \link{https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md} +#' @description Creates an Rcpp::XPtr. For more details on options \href{https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md}{Configuration} #' @param keys a character vector indicating option keys to parameterize the RdKafka::Producer #' @param values a character vector indicating option values to parameterize the RdKafka::Producer. Must be of same length as keys. #' @return a Rcpp::XPtr diff --git a/fRanz/man/GetRdConsumer.Rd b/fRanz/man/GetRdConsumer.Rd index 3abe841..8cb2e5c 100644 --- a/fRanz/man/GetRdConsumer.Rd +++ b/fRanz/man/GetRdConsumer.Rd @@ -15,5 +15,5 @@ GetRdConsumer(keys, values) a Rcpp::XPtr } \description{ -Creates an Rcpp::XPtr. For more details on options \link{https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md} +Creates an Rcpp::XPtr. For more details on options \href{https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md}{Configuration} } diff --git a/fRanz/man/GetRdProducer.Rd b/fRanz/man/GetRdProducer.Rd index cf02b05..a7f11df 100644 --- a/fRanz/man/GetRdProducer.Rd +++ b/fRanz/man/GetRdProducer.Rd @@ -15,5 +15,5 @@ GetRdProducer(keys, values) a Rcpp::XPtr } \description{ -Creates an Rcpp::XPtr. For more details on options \link{https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md} +Creates an Rcpp::XPtr. For more details on options \href{https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md}{Configuration} } diff --git a/fRanz/man/Kafka-Broker.Rd b/fRanz/man/KafkaBroker.Rd similarity index 86% rename from fRanz/man/Kafka-Broker.Rd rename to fRanz/man/KafkaBroker.Rd index 31b75f4..5491b47 100644 --- a/fRanz/man/Kafka-Broker.Rd +++ b/fRanz/man/KafkaBroker.Rd @@ -1,8 +1,7 @@ % Generated by roxygen2: do not edit by hand % Please edit documentation in R/KafkaBroker.R \docType{data} -\name{Kafka Broker} -\alias{Kafka Broker} +\name{KafkaBroker} \alias{KafkaBroker} \title{KafkaBroker} \format{An object of class \code{R6ClassGenerator} of length 24.} diff --git a/fRanz/man/KafkaConsume.Rd b/fRanz/man/KafkaConsume.Rd index 46856fa..57532a8 100644 --- a/fRanz/man/KafkaConsume.Rd +++ b/fRanz/man/KafkaConsume.Rd @@ -4,16 +4,18 @@ \alias{KafkaConsume} \title{KafkaConsume} \usage{ -KafkaConsume(consumerPtr, numResults) +KafkaConsume(consumerPtr, numResults, timeout = 10000L) } \arguments{ \item{consumerPtr}{a reference to a Rcpp::XPtr} \item{numResults}{how many results should be consumed before returning. Will return early if offset is at maximum} + +\item{timeout}{the timeout in milliseconds. Default is 10000} } \value{ a list of length numResults with values list(key=key,value=value) } \description{ -KafkaConsume +A method to that takes a consumer pointer and returns at most the specified number of results, unless the timeout is reached } diff --git a/fRanz/src/Makevars b/fRanz/src/Makevars index e557180..355103a 100644 --- a/fRanz/src/Makevars +++ b/fRanz/src/Makevars @@ -1,23 +1,6 @@ -INSTALLDIR = $(HOME)/.fRanz/librdkafka -LIBRDKAFKADIR = $(PWD)/../inst/librdkafka-0.11.6 - -PKG_LIBS = -L$(INSTALLDIR)/src-cpp -lrdkafka++ -PKG_CXXFLAGS = -std=c++11 -I$(INSTALLDIR)/src-cpp - -.PHONY: all install_librdkadka - -all: install_librdkadka - -install_librdkadka: - if [ ! -s $(INSTALLDIR)/src-cpp/librdkafka++.a ] ; \ - then \ - mkdir -p $(INSTALLDIR) && \ - cd $(shell dirname $(LIBRDKAFKADIR)) && \ - tar xzf $(LIBRDKAFKADIR).tar.gz && \ - cd $(LIBRDKAFKADIR) && \ - ./configure && \ - $(MAKE) && \ - $(MAKE) install && \ - mv * $(INSTALLDIR) ; \ - fi +LIBRDKAFKA_LIBS = -L../../dest -lrdkafka++ +LIBRDKAFKA_CXXFLAGS = -I../../dest/include -lrdkafka++ +CXX_STD=CXX11 +PKG_CXXFLAGS = $(LIBRDKAFKA_CXXFLAGS) +PKG_LIBS = $(LIBRDKAFKA_LIBS) \ No newline at end of file diff --git a/fRanz/src/RcppExports.cpp b/fRanz/src/RcppExports.cpp index 912569c..b608c23 100644 --- a/fRanz/src/RcppExports.cpp +++ b/fRanz/src/RcppExports.cpp @@ -30,14 +30,15 @@ BEGIN_RCPP END_RCPP } // KafkaConsume -Rcpp::List KafkaConsume(SEXP consumerPtr, int numResults); -RcppExport SEXP _fRanz_KafkaConsume(SEXP consumerPtrSEXP, SEXP numResultsSEXP) { +Rcpp::List KafkaConsume(SEXP consumerPtr, int numResults, int timeout); +RcppExport SEXP _fRanz_KafkaConsume(SEXP consumerPtrSEXP, SEXP numResultsSEXP, SEXP timeoutSEXP) { BEGIN_RCPP Rcpp::RObject rcpp_result_gen; Rcpp::RNGScope rcpp_rngScope_gen; Rcpp::traits::input_parameter< SEXP >::type consumerPtr(consumerPtrSEXP); Rcpp::traits::input_parameter< int >::type numResults(numResultsSEXP); - rcpp_result_gen = Rcpp::wrap(KafkaConsume(consumerPtr, numResults)); + Rcpp::traits::input_parameter< int >::type timeout(timeoutSEXP); + rcpp_result_gen = Rcpp::wrap(KafkaConsume(consumerPtr, numResults, timeout)); return rcpp_result_gen; END_RCPP } @@ -72,7 +73,7 @@ END_RCPP static const R_CallMethodDef CallEntries[] = { {"_fRanz_GetRdConsumer", (DL_FUNC) &_fRanz_GetRdConsumer, 2}, {"_fRanz_RdSubscribe", (DL_FUNC) &_fRanz_RdSubscribe, 2}, - {"_fRanz_KafkaConsume", (DL_FUNC) &_fRanz_KafkaConsume, 2}, + {"_fRanz_KafkaConsume", (DL_FUNC) &_fRanz_KafkaConsume, 3}, {"_fRanz_GetRdProducer", (DL_FUNC) &_fRanz_GetRdProducer, 2}, {"_fRanz_KafkaProduce", (DL_FUNC) &_fRanz_KafkaProduce, 5}, {NULL, NULL, 0} diff --git a/fRanz/src/consumer.cpp b/fRanz/src/consumer.cpp index a9de7c7..6da5113 100644 --- a/fRanz/src/consumer.cpp +++ b/fRanz/src/consumer.cpp @@ -1,19 +1,11 @@ -#include +#include #include #include "utils.h" -#include -#include -#include -#include -#include -#include -#include -#include //////////////////////////////////////////////////////////////////////////////////////// //' @title GetRdConsumer //' @name GetRdConsumer -//' @description Creates an Rcpp::XPtr. For more details on options \link{https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md} +//' @description Creates an Rcpp::XPtr. For more details on options \href{https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md}{Configuration} //' @param keys a character vector indicating option keys to parameterize the RdKafka::Consumer //' @param values a character vector indicating option values to parameterize the RdKafka::Consumer. Must be of same length as keys. //' @return a Rcpp::XPtr @@ -52,18 +44,19 @@ int RdSubscribe(SEXP consumerPtr, const Rcpp::StringVector Rtopics) { //' @title KafkaConsume //' @name KafkaConsume -//' @description +//' @description A method to that takes a consumer pointer and returns at most the specified number of results, unless the timeout is reached //' @param consumerPtr a reference to a Rcpp::XPtr //' @param numResults how many results should be consumed before returning. Will return early if offset is at maximum +//' @param timeout the timeout in milliseconds. Default is 10000 //' @return a list of length numResults with values list(key=key,value=value) //' @export // [[Rcpp::export]] -Rcpp::List KafkaConsume(SEXP consumerPtr, int numResults) { +Rcpp::List KafkaConsume(SEXP consumerPtr, int numResults, int timeout = 10000) { Rcpp::XPtr consumer(consumerPtr); Rcpp::List messages(numResults); for(int i = 0; i < numResults; i++) { - RdKafka::Message *msg = consumer->consume(10000); + RdKafka::Message *msg = consumer->consume(timeout); switch(msg->err()){ case RdKafka::ERR_NO_ERROR: { Rcpp::List message = Rcpp::List::create(Rcpp::Named("key") = *msg->key(), diff --git a/fRanz/src/producer.cpp b/fRanz/src/producer.cpp index 0cd95fa..7f0c05a 100644 --- a/fRanz/src/producer.cpp +++ b/fRanz/src/producer.cpp @@ -1,16 +1,10 @@ -#include +#include #include #include "utils.h" -#include -#include -#include -#include -#include -#include //' @title GetRdProducer //' @name GetRdProducer -//' @description Creates an Rcpp::XPtr. For more details on options \link{https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md} +//' @description Creates an Rcpp::XPtr. For more details on options \href{https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md}{Configuration} //' @param keys a character vector indicating option keys to parameterize the RdKafka::Producer //' @param values a character vector indicating option values to parameterize the RdKafka::Producer. Must be of same length as keys. //' @return a Rcpp::XPtr diff --git a/fRanz/src/utils.cpp b/fRanz/src/utils.cpp index 8144693..0ba3627 100644 --- a/fRanz/src/utils.cpp +++ b/fRanz/src/utils.cpp @@ -1,5 +1,5 @@ #include -#include +#include diff --git a/fRanz/src/utils.h b/fRanz/src/utils.h index c4b9156..16e9f93 100644 --- a/fRanz/src/utils.h +++ b/fRanz/src/utils.h @@ -1,4 +1,4 @@ #include -#include +#include RdKafka::Conf* MakeKafkaConfig(Rcpp::StringVector keys, Rcpp::StringVector values); \ No newline at end of file diff --git a/fRanz/tests/testthat/test-KafkaConsumer.R b/fRanz/tests/testthat/test-KafkaConsumer.R index 32a7db3..6299eea 100644 --- a/fRanz/tests/testthat/test-KafkaConsumer.R +++ b/fRanz/tests/testthat/test-KafkaConsumer.R @@ -2,6 +2,7 @@ context("KafkaConsumer and KafkaProducer") testthat::test_that("Testing KafkaConsumer and KafkaProducer work consuming 1 message",{ + testthat::skip_on_cran() ## Standard Set Up topic <- uuid::UUIDgenerate() group <- uuid::UUIDgenerate() @@ -25,6 +26,7 @@ testthat::test_that("Testing KafkaConsumer and KafkaProducer work consuming 1 me testthat::test_that("Testing KafkaConsumer and KafkaProducer work consuming random number of messages",{ + testthat::skip_on_cran() ## Standard Set Up topic <- uuid::UUIDgenerate() group <- uuid::UUIDgenerate()