From b27b901b997a5ba804b408fb65992a54f3da5db9 Mon Sep 17 00:00:00 2001 From: huafengw Date: Wed, 6 Jan 2016 14:26:02 +0800 Subject: [PATCH 1/2] add gearpump benchmark --- gearpump-benchmarks/pom.xml | 116 +++++++++++++++ .../src/main/resources/reference.conf | 5 + .../gearpump/benchmark/Advertising.scala | 138 ++++++++++++++++++ pom.xml | 31 ++++ stream-bench.sh | 61 ++++++++ 5 files changed, 351 insertions(+) create mode 100644 gearpump-benchmarks/pom.xml create mode 100644 gearpump-benchmarks/src/main/resources/reference.conf create mode 100644 gearpump-benchmarks/src/main/scala/gearpump/benchmark/Advertising.scala diff --git a/gearpump-benchmarks/pom.xml b/gearpump-benchmarks/pom.xml new file mode 100644 index 000000000..b78ec09a3 --- /dev/null +++ b/gearpump-benchmarks/pom.xml @@ -0,0 +1,116 @@ + + + + + yahoo-low-latency-bechmarks + com.yahoo.stream + 0.1.0 + + 4.0.0 + gearpump-benchmarks + + + + com.github.intel-hadoop + gearpump-streaming_${scala.binary.version} + provided + + + com.github.intel-hadoop + gearpump-core_${scala.binary.version} + provided + + + com.github.intel-hadoop + gearpump-external-kafka_${scala.binary.version} + + + org.json + json + + + com.yahoo.stream + streaming-benchmark-common + + + + + + + net.alchim31.maven + scala-maven-plugin + 3.2.2 + + + eclipse-add-source + + add-source + + + + scala-compile-first + process-resources + + compile + + + + scala-test-compile-first + process-test-resources + + testCompile + + + + + ${scala.version} + incremental + true + + -unchecked + -deprecation + -feature + + + -Xms1024m + -Xmx1024m + + + -source + ${java.version} + -target + ${java.version} + -Xlint:all,-serial,-path + + + + + org.apache.maven.plugins + maven-shade-plugin + + true + false + + + + + + + + package + + shade + + + + + + + diff --git a/gearpump-benchmarks/src/main/resources/reference.conf b/gearpump-benchmarks/src/main/resources/reference.conf new file mode 100644 index 000000000..3a245f0f6 --- /dev/null +++ b/gearpump-benchmarks/src/main/resources/reference.conf @@ -0,0 +1,5 @@ +gearpump { + serializers { + "scala.Tuple7" = "" + } +} \ No newline at end of file diff --git a/gearpump-benchmarks/src/main/scala/gearpump/benchmark/Advertising.scala b/gearpump-benchmarks/src/main/scala/gearpump/benchmark/Advertising.scala new file mode 100644 index 000000000..ff42c73bb --- /dev/null +++ b/gearpump-benchmarks/src/main/scala/gearpump/benchmark/Advertising.scala @@ -0,0 +1,138 @@ +package gearpump.benchmark + +import akka.actor.ActorSystem +import benchmark.common.Utils +import benchmark.common.advertising.{CampaignProcessorCommon, RedisAdCampaignCache} +import io.gearpump.Message +import io.gearpump.partitioner.{Partitioner, UnicastPartitioner, HashPartitioner} +import io.gearpump.cluster.UserConfig +import io.gearpump.cluster.client.ClientContext +import io.gearpump.streaming.kafka.lib.StringMessageDecoder +import io.gearpump.streaming.{StreamApplication, Processor} +import io.gearpump.streaming.kafka.{KafkaSource, KafkaStorageFactory} +import io.gearpump.streaming.source.{DefaultTimeStampFilter, DataSourceProcessor} +import io.gearpump.streaming.task.{StartTime, Task, TaskContext} +import io.gearpump.util.{AkkaApp, Graph} +import org.json.JSONObject +import io.gearpump.util.Graph.Node +import scala.collection.JavaConverters._ + +object Advertising extends AkkaApp{ + + def application(args: Array[String], system: ActorSystem) : StreamApplication = { + implicit val actorSystem = system + val commonConfig = Utils.findAndReadConfigFile(args(0), true).asInstanceOf[java.util.Map[String, Any]] + + val cores = commonConfig.get("process.cores").asInstanceOf[Int] + val topic = commonConfig.get("kafka.topic").asInstanceOf[String] + val partitions = commonConfig.get("kafka.partitions").asInstanceOf[Int] + val redisHost = commonConfig.get("redis.host").asInstanceOf[String] + + val zookeeperHosts = commonConfig.get("zookeeper.servers").asInstanceOf[java.util.List[String]] match { + case l: java.util.List[String] => l.asScala.toSeq + case other => throw new ClassCastException(other + " not a List[String]") + } + val zookeeperPort = commonConfig.get("zookeeper.port").asInstanceOf[Int] + val zookeeperConnect = zookeeperHosts.map(_ + ":" + zookeeperPort).mkString(",") + + val kafkaHosts = commonConfig.get("kafka.brokers").asInstanceOf[java.util.List[String]] match { + case l: java.util.List[String] => l.asScala.toSeq + case other => throw new ClassCastException(other + " not a List[String]") + } + val kafkaPort = commonConfig.get("kafka.port").asInstanceOf[Int] + val brokerList = kafkaHosts.map(_ + ":" + kafkaPort).mkString(",") + + val parallel = Math.max(1, cores / 7) + val gearConfig = UserConfig.empty.withString("redis.host", redisHost) + val offsetStorageFactory = new KafkaStorageFactory(zookeeperConnect, brokerList) + val source = new KafkaSource(topic, zookeeperConnect, offsetStorageFactory, new StringMessageDecoder, new DefaultTimeStampFilter) + val sourceProcessor = DataSourceProcessor(source, partitions) + val deserializer = Processor[DeserializeTask](parallel) + val filter = Processor[EventFilterTask](parallel) + val projection = Processor[EventProjectionTask](parallel) + val join = Processor[RedisJoinTask](parallel) + val campaign = Processor[CampaignProcessorTask](parallel * 2) + val partitioner = new AdPartitioner + + val graph = Graph(sourceProcessor ~ new HashPartitioner ~> deserializer ~> filter ~> projection ~> join ~ partitioner ~> campaign) + StreamApplication("Advertising", graph, gearConfig) + } + + override def main(akkaConf: Advertising.Config, args: Array[String]): Unit = { + val context = ClientContext(akkaConf) + context.submit(application(args, context.system)) + context.close() + } + + override def help: Unit = {} +} + +class AdPartitioner extends UnicastPartitioner { + override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = { + (msg.msg.asInstanceOf[(String, String, String)]._1.hashCode & Integer.MAX_VALUE) % partitionNum + } +} + +class DeserializeTask(taskContext : TaskContext, conf: UserConfig) extends Task(taskContext, conf) { + override def onNext(msg : Message) : Unit = { + val jsonObj = new JSONObject(msg.msg.asInstanceOf[String]) + val tuple = ( + jsonObj.getString("user_id"), + jsonObj.getString("page_id"), + jsonObj.getString("ad_id"), + jsonObj.getString("ad_type"), + jsonObj.getString("event_type"), + jsonObj.getString("event_time"), + jsonObj.getString("ip_address") + ) + taskContext.output(Message(tuple, msg.timestamp)) + } +} + +class EventFilterTask(taskContext : TaskContext, conf: UserConfig) extends Task(taskContext, conf) { + override def onNext(msg: Message): Unit = { + val tuple = msg.msg.asInstanceOf[(String, String, String, String, String, String, String)] + if(tuple._5 == "view") { + taskContext.output(msg) + } + } +} + +class EventProjectionTask(taskContext : TaskContext, conf: UserConfig) extends Task(taskContext, conf) { + override def onNext(msg: Message): Unit = { + val tuple = msg.msg.asInstanceOf[(String, String, String, String, String, String, String)] + taskContext.output(Message((tuple._3, tuple._6), msg.timestamp)) + } +} + +class RedisJoinTask(taskContext : TaskContext, conf: UserConfig) extends Task(taskContext, conf) { + private val redisHost = conf.getString("redis.host").get + private val redisAdCampaignCache = new RedisAdCampaignCache(redisHost) + + override def onStart(startTime : StartTime) : Unit = { + redisAdCampaignCache.prepare() + } + + override def onNext(msg: Message): Unit = { + val (ad_id, event_time) = msg.msg.asInstanceOf[(String, String)] + val campaign_id = redisAdCampaignCache.execute(ad_id) + if(campaign_id != null) { + val result = (campaign_id, ad_id, event_time) + taskContext.output(Message(result, msg.timestamp)) + } + } +} + +class CampaignProcessorTask(taskContext : TaskContext, conf: UserConfig) extends Task(taskContext, conf) { + private val redisHost = conf.getString("redis.host").get + private val campaignProcessorCommon = new CampaignProcessorCommon(redisHost) + + override def onStart(startTime : StartTime) : Unit = { + campaignProcessorCommon.prepare() + } + + override def onNext(msg: Message): Unit = { + val (campaign_id, _, event_time) = msg.msg.asInstanceOf[(String, String, String)] + campaignProcessorCommon.execute(campaign_id, event_time) + } +} diff --git a/pom.xml b/pom.xml index dec8e7fff..4ee31ea2e 100644 --- a/pom.xml +++ b/pom.xml @@ -12,6 +12,20 @@ Licensed under the terms of the Apache License 2.0. Please see LICENSE file in t yahoo-low-latency-bechmarks 0.1.0 + + + releases-oss.sonatype.org + Sonatype Releases Repository + http://oss.sonatype.org/content/repositories/releases/ + + + + gearpump-shaded-repo + Vincent at Bintray + http://dl.bintray.com/fvunicorn/maven + + + UTF-8 UTF-8 @@ -19,6 +33,7 @@ Licensed under the terms of the Apache License 2.0. Please see LICENSE file in t 0.8.2.1 0.10.1 0.10.0 + 0.7.5 2.10 2.10.4 20141113 @@ -65,6 +80,21 @@ Licensed under the terms of the Apache License 2.0. Please see LICENSE file in t flink-connector-kafka ${flink.version} + + com.github.intel-hadoop + gearpump-core_${scala.binary.version} + ${gearpump.version} + + + com.github.intel-hadoop + gearpump-streaming_${scala.binary.version} + ${gearpump.version} + + + com.github.intel-hadoop + gearpump-external-kafka_${scala.binary.version} + ${gearpump.version} + com.yahoo.stream streaming-benchmark-common @@ -128,5 +158,6 @@ Licensed under the terms of the Apache License 2.0. Please see LICENSE file in t storm-benchmarks flink-benchmarks spark-benchmarks + gearpump-benchmarks diff --git a/stream-bench.sh b/stream-bench.sh index d2cb3d57c..9c52cb228 100755 --- a/stream-bench.sh +++ b/stream-bench.sh @@ -18,12 +18,14 @@ SCALA_SUB_VERSION=${SCALA_SUB_VERSION:-"4"} STORM_VERSION=${STORM_VERSION:-"0.10.0"} FLINK_VERSION=${FLINK_VERSION:-"0.10.1"} SPARK_VERSION=${SPARK_VERSION:-"1.5.1"} +GEARPUMP_VERSION=${GEARPUMP_VERSION:-"0.7.5"} STORM_DIR="apache-storm-$STORM_VERSION" REDIS_DIR="redis-$REDIS_VERSION" KAFKA_DIR="kafka_$SCALA_BIN_VERSION-$KAFKA_VERSION" FLINK_DIR="flink-$FLINK_VERSION" SPARK_DIR="spark-$SPARK_VERSION-bin-hadoop2.6" +GEARPUMP_DIR="gearpump-$SCALA_BIN_VERSION-$GEARPUMP_VERSION" #Get one of the closet apache mirrors APACHE_MIRROR=$(curl 'https://www.apache.org/dyn/closer.cgi' | grep -o '[^<]*' | sed 's/<[^>]*>//g' | head -1) @@ -150,6 +152,10 @@ run() { SPARK_FILE="$SPARK_DIR.tgz" fetch_untar_file "$SPARK_FILE" "$APACHE_MIRROR/spark/spark-$SPARK_VERSION/$SPARK_FILE" + #Fetch Gearpump + GEARPUMP_FILE="$GEARPUMP_DIR.tar.gz" + fetch_untar_file "$GEARPUMP_FILE" "https://github.com/gearpump/gearpump/releases/download/$GEARPUMP_VERSION/$GEARPUMP_FILE" + elif [ "START_ZK" = "$OPERATION" ]; then start_if_needed dev_zookeeper ZooKeeper 10 "$STORM_DIR/bin/storm" dev-zookeeper @@ -203,6 +209,20 @@ run() { stop_if_needed org.apache.spark.deploy.master.Master SparkMaster stop_if_needed org.apache.spark.deploy.worker.Worker SparkSlave sleep 3 + + elif [ "START_GEARPUMP" = "$OPERATION" ]; + then + start_if_needed io.gearpump.cluster.main.Master GearpumpMaster 3 $GEARPUMP_DIR/bin/master -ip 127.0.0.1 -port 3000 + start_if_needed io.gearpump.cluster.main.Worker GearpumpWorker 3 $GEARPUMP_DIR/bin/worker + start_if_needed io.gearpump.services.main.Services GearpumpDashboard 3 $GEARPUMP_DIR/bin/services + sleep 3 + elif [ "STOP_GEARPUMP" = "$OPERATION" ]; + then + stop_if_needed io.gearpump.services.main.Services GearpumpDashboard + stop_if_needed io.gearpump.cluster.main.Worker GearpumpWorker + stop_if_needed io.gearpump.cluster.main.Master GearpumpMaster + sleep 3 + elif [ "START_LOAD" = "$OPERATION" ]; then cd data @@ -214,6 +234,7 @@ run() { cd data $LEIN run -g --configPath ../$CONF_FILE || true cd .. + elif [ "START_STORM_TOPOLOGY" = "$OPERATION" ]; then "$STORM_DIR/bin/storm" jar ./storm-benchmarks/target/storm-benchmarks-0.1.0.jar storm.benchmark.AdvertisingTopology test-topo -conf $CONF_FILE @@ -222,6 +243,7 @@ run() { then "$STORM_DIR/bin/storm" kill -w 0 test-topo || true sleep 10 + elif [ "START_SPARK_PROCESSING" = "$OPERATION" ]; then "$SPARK_DIR/bin/spark-submit" --master spark://localhost:7077 --class spark.benchmark.KafkaRedisAdvertisingStream ./spark-benchmarks/target/spark-benchmarks-0.1.0.jar "$CONF_FILE" & @@ -229,6 +251,7 @@ run() { elif [ "STOP_SPARK_PROCESSING" = "$OPERATION" ]; then stop_if_needed spark.benchmark.KafkaRedisAdvertisingStream "Spark Client Process" + elif [ "START_FLINK_PROCESSING" = "$OPERATION" ]; then "$FLINK_DIR/bin/flink" run ./flink-benchmarks/target/flink-benchmarks-0.1.0.jar --confPath $CONF_FILE & @@ -243,6 +266,22 @@ run() { "$FLINK_DIR/bin/flink" cancel $FLINK_ID sleep 3 fi + + elif [ "START_GEARPUMP_APP" = "$OPERATION" ]; + then + "$GEARPUMP_DIR/bin/gear" app -jar ./gearpump-benchmarks/target/gearpump-benchmarks-0.1.0.jar gearpump.benchmark.Advertising $CONF_FILE & + sleep 5 + elif [ "STOP_GEARPUMP_APP" = "$OPERATION" ]; + then + APP_ID=`"$GEARPUMP_DIR/bin/gear" info | grep "application:" | awk -F ',' '{print $1}' | awk '{print $2}'` + if [ "$APP_ID" == "" ]; + then + echo "Could not find Gearpump application to kill" + else + "$GEARPUMP_DIR/bin/gear" kill -appid $APP_ID + sleep 5 + fi + elif [ "STORM_TEST" = "$OPERATION" ]; then run "START_ZK" @@ -288,6 +327,21 @@ run() { run "STOP_KAFKA" run "STOP_REDIS" run "STOP_ZK" + elif [ "GEARPUMP_TEST" = "$OPERATION" ]; + then + run "START_ZK" + run "START_REDIS" + run "START_KAFKA" + run "START_GEARPUMP" + run "START_GEARPUMP_APP" + run "START_LOAD" + sleep $TEST_TIME + run "STOP_LOAD" + run "STOP_GEARPUMP_APP" + run "STOP_GEARPUMP" + run "STOP_KAFKA" + run "STOP_REDIS" + run "STOP_ZK" elif [ "STOP_ALL" = "$OPERATION" ]; then run "STOP_LOAD" @@ -297,6 +351,8 @@ run() { run "STOP_FLINK" run "STOP_STORM_TOPOLOGY" run "STOP_STORM" + run "STOP_GEARPUMP_APP" + run "STOP_GEARPUMP" run "STOP_KAFKA" run "STOP_REDIS" run "STOP_ZK" @@ -322,6 +378,8 @@ run() { echo "STOP_FLINK: kill flink processes" echo "START_SPARK: run spark processes" echo "STOP_SPARK: kill spark processes" + echo "START_GEARPUMP: run gearpump processes" + echo "STOP_GEARPUMP: kill gearpump processes" echo echo "START_STORM_TOPOLOGY: run the storm test topology" echo "STOP_STORM_TOPOLOGY: kill the storm test topology" @@ -329,10 +387,13 @@ run() { echo "STOP_FLINK_PROCESSSING: kill the flink test processing" echo "START_SPARK_PROCESSING: run the spark test processing" echo "STOP_SPARK_PROCESSSING: kill the spark test processing" + echo "START_GEARPUMP_PROCESSING: run the gearpump test processing" + echo "STOP_GEARPUMP_PROCESSSING: kill the gearpump test processing" echo echo "STORM_TEST: run storm test (assumes SETUP is done)" echo "FLINK_TEST: run flink test (assumes SETUP is done)" echo "SPARK_TEST: run spark test (assumes SETUP is done)" + echo "GEARPUMP_TEST: run gearpump test (assumes SETUP is done)" echo "STOP_ALL: stop everything" echo echo "HELP: print out this message" From e25780840d8af98072666edaa0eba5fd1d145694 Mon Sep 17 00:00:00 2001 From: huafengw Date: Fri, 2 Sep 2016 11:05:30 +0800 Subject: [PATCH 2/2] upgrade gearpump to 0.8.1 --- gearpump-benchmarks/pom.xml | 14 +++---- .../gearpump/benchmark/Advertising.scala | 37 ++++++++++++------- pom.xml | 29 ++++----------- stream-bench.sh | 16 ++++---- 4 files changed, 46 insertions(+), 50 deletions(-) diff --git a/gearpump-benchmarks/pom.xml b/gearpump-benchmarks/pom.xml index b78ec09a3..ab52b9401 100644 --- a/gearpump-benchmarks/pom.xml +++ b/gearpump-benchmarks/pom.xml @@ -16,18 +16,18 @@ - com.github.intel-hadoop - gearpump-streaming_${scala.binary.version} + org.apache.gearpump + gearpump-streaming_2.11 provided - com.github.intel-hadoop - gearpump-core_${scala.binary.version} + org.apache.gearpump + gearpump-core_2.11 provided - com.github.intel-hadoop - gearpump-external-kafka_${scala.binary.version} + org.apache.gearpump + gearpump-external-kafka_2.11 org.json @@ -68,7 +68,7 @@ - ${scala.version} + ${gp.scala.version} incremental true diff --git a/gearpump-benchmarks/src/main/scala/gearpump/benchmark/Advertising.scala b/gearpump-benchmarks/src/main/scala/gearpump/benchmark/Advertising.scala index ff42c73bb..470b2a4f0 100644 --- a/gearpump-benchmarks/src/main/scala/gearpump/benchmark/Advertising.scala +++ b/gearpump-benchmarks/src/main/scala/gearpump/benchmark/Advertising.scala @@ -1,20 +1,24 @@ package gearpump.benchmark +import java.util.Properties + import akka.actor.ActorSystem import benchmark.common.Utils import benchmark.common.advertising.{CampaignProcessorCommon, RedisAdCampaignCache} -import io.gearpump.Message -import io.gearpump.partitioner.{Partitioner, UnicastPartitioner, HashPartitioner} -import io.gearpump.cluster.UserConfig -import io.gearpump.cluster.client.ClientContext -import io.gearpump.streaming.kafka.lib.StringMessageDecoder -import io.gearpump.streaming.{StreamApplication, Processor} -import io.gearpump.streaming.kafka.{KafkaSource, KafkaStorageFactory} -import io.gearpump.streaming.source.{DefaultTimeStampFilter, DataSourceProcessor} -import io.gearpump.streaming.task.{StartTime, Task, TaskContext} -import io.gearpump.util.{AkkaApp, Graph} +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.cluster.client.ClientContext +import org.apache.gearpump.partitioner.{UnicastPartitioner, HashPartitioner} +import org.apache.gearpump.streaming.StreamApplication +import org.apache.gearpump.streaming.Processor +import org.apache.gearpump.streaming.kafka.lib.source.StringMessageDecoder +import org.apache.gearpump.streaming.kafka.util.KafkaConfig +import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext} +import org.apache.gearpump.streaming.kafka.KafkaSource +import org.apache.gearpump.streaming.source.DataSourceProcessor +import org.apache.gearpump.util.{Graph, AkkaApp} +import org.apache.gearpump.util.Graph._ import org.json.JSONObject -import io.gearpump.util.Graph.Node import scala.collection.JavaConverters._ object Advertising extends AkkaApp{ @@ -42,10 +46,15 @@ object Advertising extends AkkaApp{ val kafkaPort = commonConfig.get("kafka.port").asInstanceOf[Int] val brokerList = kafkaHosts.map(_ + ":" + kafkaPort).mkString(",") + val appName = "Advertising" val parallel = Math.max(1, cores / 7) + val props = new Properties + props.put(KafkaConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeperConnect) + props.put(KafkaConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) + props.put(KafkaConfig.CHECKPOINT_STORE_NAME_PREFIX_CONFIG, appName) + props.put(KafkaConfig.MESSAGE_DECODER_CLASS_CONFIG, classOf[StringMessageDecoder]) val gearConfig = UserConfig.empty.withString("redis.host", redisHost) - val offsetStorageFactory = new KafkaStorageFactory(zookeeperConnect, brokerList) - val source = new KafkaSource(topic, zookeeperConnect, offsetStorageFactory, new StringMessageDecoder, new DefaultTimeStampFilter) + val source = new KafkaSource(topic, props) val sourceProcessor = DataSourceProcessor(source, partitions) val deserializer = Processor[DeserializeTask](parallel) val filter = Processor[EventFilterTask](parallel) @@ -55,7 +64,7 @@ object Advertising extends AkkaApp{ val partitioner = new AdPartitioner val graph = Graph(sourceProcessor ~ new HashPartitioner ~> deserializer ~> filter ~> projection ~> join ~ partitioner ~> campaign) - StreamApplication("Advertising", graph, gearConfig) + StreamApplication(appName, graph, gearConfig) } override def main(akkaConf: Advertising.Config, args: Array[String]): Unit = { diff --git a/pom.xml b/pom.xml index 4ee31ea2e..05e6acbdc 100644 --- a/pom.xml +++ b/pom.xml @@ -12,20 +12,6 @@ Licensed under the terms of the Apache License 2.0. Please see LICENSE file in t yahoo-low-latency-bechmarks 0.1.0 - - - releases-oss.sonatype.org - Sonatype Releases Repository - http://oss.sonatype.org/content/repositories/releases/ - - - - gearpump-shaded-repo - Vincent at Bintray - http://dl.bintray.com/fvunicorn/maven - - - UTF-8 UTF-8 @@ -33,9 +19,10 @@ Licensed under the terms of the Apache License 2.0. Please see LICENSE file in t 0.8.2.1 0.10.1 0.10.0 - 0.7.5 + 0.8.1 2.10 2.10.4 + 2.11.8 20141113 2.4.2 1.2.2 @@ -81,18 +68,18 @@ Licensed under the terms of the Apache License 2.0. Please see LICENSE file in t ${flink.version} - com.github.intel-hadoop - gearpump-core_${scala.binary.version} + org.apache.gearpump + gearpump-core_2.11 ${gearpump.version} - com.github.intel-hadoop - gearpump-streaming_${scala.binary.version} + org.apache.gearpump + gearpump-streaming_2.11 ${gearpump.version} - com.github.intel-hadoop - gearpump-external-kafka_${scala.binary.version} + org.apache.gearpump + gearpump-external-kafka_2.11 ${gearpump.version} diff --git a/stream-bench.sh b/stream-bench.sh index 9c52cb228..c38607605 100755 --- a/stream-bench.sh +++ b/stream-bench.sh @@ -18,14 +18,14 @@ SCALA_SUB_VERSION=${SCALA_SUB_VERSION:-"4"} STORM_VERSION=${STORM_VERSION:-"0.10.0"} FLINK_VERSION=${FLINK_VERSION:-"0.10.1"} SPARK_VERSION=${SPARK_VERSION:-"1.5.1"} -GEARPUMP_VERSION=${GEARPUMP_VERSION:-"0.7.5"} +GEARPUMP_VERSION=${GEARPUMP_VERSION:-"0.8.1"} STORM_DIR="apache-storm-$STORM_VERSION" REDIS_DIR="redis-$REDIS_VERSION" KAFKA_DIR="kafka_$SCALA_BIN_VERSION-$KAFKA_VERSION" FLINK_DIR="flink-$FLINK_VERSION" SPARK_DIR="spark-$SPARK_VERSION-bin-hadoop2.6" -GEARPUMP_DIR="gearpump-$SCALA_BIN_VERSION-$GEARPUMP_VERSION" +GEARPUMP_DIR="gearpump-2.11-$GEARPUMP_VERSION" #Get one of the closet apache mirrors APACHE_MIRROR=$(curl 'https://www.apache.org/dyn/closer.cgi' | grep -o '[^<]*' | sed 's/<[^>]*>//g' | head -1) @@ -212,15 +212,15 @@ run() { elif [ "START_GEARPUMP" = "$OPERATION" ]; then - start_if_needed io.gearpump.cluster.main.Master GearpumpMaster 3 $GEARPUMP_DIR/bin/master -ip 127.0.0.1 -port 3000 - start_if_needed io.gearpump.cluster.main.Worker GearpumpWorker 3 $GEARPUMP_DIR/bin/worker - start_if_needed io.gearpump.services.main.Services GearpumpDashboard 3 $GEARPUMP_DIR/bin/services + start_if_needed org.apache.gearpump.cluster.main.Master GearpumpMaster 3 $GEARPUMP_DIR/bin/master -ip 127.0.0.1 -port 3000 + start_if_needed org.apache.gearpump.cluster.main.Worker GearpumpWorker 3 $GEARPUMP_DIR/bin/worker + start_if_needed org.apache.gearpump.services.main.Services GearpumpDashboard 3 $GEARPUMP_DIR/bin/services sleep 3 elif [ "STOP_GEARPUMP" = "$OPERATION" ]; then - stop_if_needed io.gearpump.services.main.Services GearpumpDashboard - stop_if_needed io.gearpump.cluster.main.Worker GearpumpWorker - stop_if_needed io.gearpump.cluster.main.Master GearpumpMaster + stop_if_needed org.apache.gearpump.services.main.Services GearpumpDashboard + stop_if_needed org.apache.gearpump.cluster.main.Worker GearpumpWorker + stop_if_needed org.apache.gearpump.cluster.main.Master GearpumpMaster sleep 3 elif [ "START_LOAD" = "$OPERATION" ];