From f68c5381ef7a30f5774de23cabc57fa7aeaf37c5 Mon Sep 17 00:00:00 2001 From: karol-brejna-i Date: Thu, 16 Jul 2015 18:24:31 +0200 Subject: [PATCH] Introducing kafka->kafka example --- README.md | 126 +++++++++++++++--- pom.xml | 6 +- src/main/java/kafka/HBaseSinkTask.java | 71 ++++++---- src/main/java/kafka/KafkaWordCount.java | 93 ++++++++----- src/main/java/kafka/Split.java | 65 ++++++--- src/main/java/kafka/Sum.java | 1 + .../kafka2kafka/ByteArray2StringTask.java | 71 ++++++++++ src/main/java/kafka2kafka/LogMessageTask.java | 56 ++++++++ src/main/java/kafka2kafka/PipelineApp.java | 84 ++++++++++++ .../java/kafka2kafka/String2Tuple2Task.java | 74 ++++++++++ 10 files changed, 543 insertions(+), 104 deletions(-) create mode 100644 src/main/java/kafka2kafka/ByteArray2StringTask.java create mode 100644 src/main/java/kafka2kafka/LogMessageTask.java create mode 100644 src/main/java/kafka2kafka/PipelineApp.java create mode 100644 src/main/java/kafka2kafka/String2Tuple2Task.java diff --git a/README.md b/README.md index f790d5c..6c5bda9 100644 --- a/README.md +++ b/README.md @@ -1,12 +1,30 @@ -How to Build -============= -mvn package +This repository holds some common gearpump usage patterns with Java. -After build, there is a jar under `target/streaming-java-template-1.1-SNAPSHOT.jar`. +The examples include: +* word count -- simple Java app that shows the structure of gearpump app +* Kafka -> Kafka pipeline -- very simple example that shows how to read and write from Kafka topics +* Kafka -> HBase pipeline -- how to read from Kafka topic, how to write to HBase -How to run -============ -1. Start the gearpump cluster (0.3.8-SNAPSHOT) +The following sections will give you information about: +* How to build and run examples +* How specific example works + +# Building and running the examples + +The repository is organized in one maven project that contains all the examples. + + +## Build + +To build the examples run: + +`mvn package` + +After build, there is a jar under `target/streaming-java-template-$VERSION.jar`. + +## Running an example + +1. Start the gearpump cluster (0.4) a) Download from http://www.gearpump.io/site/downloads/ @@ -20,24 +38,100 @@ How to run bin/services ``` -2. Submit the wordcount-jar +2. Submit the jar + ```bash + bin/gear app -jar path/to/streaming-java-template-$VERSION.jar + ``` + + for example: + ```bash - bin/gear app -jar path/to/streaming-java-template-1.1-SNAPSHOT.jar javatemplate.WordCount + bin/gear app -jar target/streaming-java-template-$VERSION.jar javatemplate.WordCount ``` + 3. Check the UI http://127.0.0.1:8090/ -NOTE: + + +> NOTE: +> +> Please use Java7 to run the cluster. +> +> You can set the ENV JAVA_HOME. + +> On windows: +> set JAVA_HOME={path_to_java_7} +> +> On Linux +> export JAVA_HOME={path_to_java_7} + +# Examples description + +## kafka2kafka-pipeline +Very simple example that shows how to read and write from Kafka topics. + +The example makes use of Gearpump Connector API, `KafkaSource` and `KafkaSink`, that make simple operations with Kafka super easy. + + +When defining Kafka source, you'll need to provide topic name and zookeeper location: +```Java +KafkaSource kafkaSource = new KafkaSource("inputTopic", "localhost:2181"); ``` -Please use Java7 to run the cluster. -You can set the ENV JAVA_HOME. +When defining Kafka sink (output), you will just give the destination topic name and Kafka broker address: +```Java +KafkaSink kafkaSink = new KafkaSink("outputTopic", "localhost:9092"); +``` -On windows: -set JAVA_HOME={path_to_java_7} +Keep in mind, that Kafka source processor produces message as byte array (`byte[]`). +Also, Kafka sink processor expects the message to be scala.Tuple. -on Linux -export JAVA_HOME={path_to_java_7} +The example shows dedicated steps that do the necessary conversions. +(The conversions don't need to be a separate step, you could include them in other task that do actual computation.) + +### Dependencies +This example uses zookeeper and Kafka. You need to set them up before running. + +Start zookeeper and Kafka: + +```bash +zookeeper/bin/zkServer.sh start + +kafka/bin/kafka-server-start.sh kafka/config/server.properties +``` + +(Tested with zookeeper 3.4.6 and Kafka 2.11-0.8.2.1. with default settings.) + +The app will read messages from `inputTopic` and write to `outputTopic`, so you may need to create them beforehand: + +```bash +kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic inputTopic + +kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic outputTopic +``` + +### Testing +After you prepared Kafka topics and deployed the app to gearpump cluster, you can start using it. + +Start producing some messages to input topic: + +```bash +kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic inputTopic +``` + + +Check if anything appears on output topic: + +```bash +kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic outputTopic --from-beginning +``` + +The tasks write to application logs, so you can browse them to see execution flow. + +The logs should be under location similar to this: +``` +$GEARPUMP_HOME/logs/applicationData/<>/<>/<>/ ``` diff --git a/pom.xml b/pom.xml index f94b82e..8d85097 100644 --- a/pom.xml +++ b/pom.xml @@ -8,7 +8,7 @@ streaming-java-template 1.1-SNAPSHOT - 0.3.8-SNAPSHOT + 0.4 @@ -66,7 +66,7 @@ org.apache.maven.plugins maven-compiler-plugin - 3.1 + 3.2 1.6 1.6 @@ -76,7 +76,7 @@ org.apache.maven.plugins maven-resources-plugin - 2.6 + 2.7 ${project.build.outputDirectory}/resources diff --git a/src/main/java/kafka/HBaseSinkTask.java b/src/main/java/kafka/HBaseSinkTask.java index a4624f2..ecece10 100644 --- a/src/main/java/kafka/HBaseSinkTask.java +++ b/src/main/java/kafka/HBaseSinkTask.java @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + package kafka; import org.apache.gearpump.Message; @@ -10,40 +29,40 @@ public class HBaseSinkTask extends Task { - public static String TABLE_NAME = "hbase.table.name"; - public static String COLUMN_FAMILY = "hbase.table.column.family"; - public static String COLUMN_NAME = "hbase.table.column.name"; - public static String ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum"; + public static String TABLE_NAME = "hbase.table.name"; + public static String COLUMN_FAMILY = "hbase.table.column.family"; + public static String COLUMN_NAME = "hbase.table.column.name"; + public static String ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum"; - private HBaseSink sink; - private String columnFamily; + private HBaseSink sink; + private String columnFamily; - public HBaseSinkTask(TaskContext taskContext, UserConfig userConf) { - super(taskContext, userConf); + public HBaseSinkTask(TaskContext taskContext, UserConfig userConf) { + super(taskContext, userConf); - String tableName = userConf.getString(TABLE_NAME).get(); - String zkQuorum = userConf.getString(ZOOKEEPER_QUORUM).get(); + String tableName = userConf.getString(TABLE_NAME).get(); + String zkQuorum = userConf.getString(ZOOKEEPER_QUORUM).get(); - columnFamily = userConf.getString(COLUMN_FAMILY).get(); + columnFamily = userConf.getString(COLUMN_FAMILY).get(); - Configuration hbaseConf = new Configuration(); - hbaseConf.set(ZOOKEEPER_QUORUM, zkQuorum); + Configuration hbaseConf = new Configuration(); + hbaseConf.set(ZOOKEEPER_QUORUM, zkQuorum); - sink = new HBaseSink(tableName, hbaseConf); - } + sink = new HBaseSink(tableName, hbaseConf); + } - @Override - public void onStart(StartTime startTime) { - //skip - } + @Override + public void onStart(StartTime startTime) { + //skip + } - @Override - public void onNext(Message message) { - String[] wordcount = ((String) message.msg()).split(":"); - String word = wordcount[0]; - String count = wordcount[1]; - sink.insert(message.timestamp() + "", columnFamily, word, count); - } + @Override + public void onNext(Message message) { + String[] wordcount = ((String) message.msg()).split(":"); + String word = wordcount[0]; + String count = wordcount[1]; + sink.insert(message.timestamp() + "", columnFamily, word, count); + } } diff --git a/src/main/java/kafka/KafkaWordCount.java b/src/main/java/kafka/KafkaWordCount.java index 3682009..4ef7fb6 100644 --- a/src/main/java/kafka/KafkaWordCount.java +++ b/src/main/java/kafka/KafkaWordCount.java @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + package kafka; @@ -15,55 +34,55 @@ public class KafkaWordCount { - public static void main(String[] args) { + public static void main(String[] args) { - ClientContext context = ClientContext.apply(); + ClientContext context = ClientContext.apply(); - UserConfig appConfig = UserConfig.empty(); + UserConfig appConfig = UserConfig.empty(); - // we will create two kafka reader task to read from kafka queue. - int sourceNum = 2; - // please create "topic1" on kafka and produce some data to it - KafkaSource source = new KafkaSource("topic1", "localhost:2181"); - Processor sourceProcessor = DataSourceProcessor.apply(source, sourceNum, "kafka_source", appConfig, context.system()); + // we will create two kafka reader task to read from kafka queue. + int sourceNum = 2; + // please create "topic1" on kafka and produce some data to it + KafkaSource source = new KafkaSource("topic1", "localhost:2181"); + Processor sourceProcessor = DataSourceProcessor.apply(source, sourceNum, "kafka_source", appConfig, context.system()); - // For split task, we config to create two tasks - int splitTaskNumber = 2; - Processor split = new DefaultProcessor(splitTaskNumber, "split", null, Split.class); + // For split task, we config to create two tasks + int splitTaskNumber = 2; + Processor split = new DefaultProcessor(splitTaskNumber, "split", null, Split.class); - // sum task - int sumTaskNumber = 2; - Processor sum = new DefaultProcessor(sumTaskNumber, "sum", null, Sum.class); + // sum task + int sumTaskNumber = 2; + Processor sum = new DefaultProcessor(sumTaskNumber, "sum", null, Sum.class); - // hbase sink - int sinkNumber = 2; - // please create HBase table "pipeline" and column family "wordcount" - UserConfig config = UserConfig.empty() - .withString(HBaseSinkTask.ZOOKEEPER_QUORUM, "localhost:2181") - .withString(HBaseSinkTask.TABLE_NAME, "pipeline") - .withString(HBaseSinkTask.COLUMN_FAMILY, "wordcount"); - Processor sinkProcessor = new DefaultProcessor(sinkNumber, "hbase_sink", config, HBaseSinkTask.class); + // hbase sink + int sinkNumber = 2; + // please create HBase table "pipeline" and column family "wordcount" + UserConfig config = + UserConfig.empty().withString(HBaseSinkTask.ZOOKEEPER_QUORUM, "localhost:2181") + .withString(HBaseSinkTask.TABLE_NAME, "pipeline") + .withString(HBaseSinkTask.COLUMN_FAMILY, "wordcount"); + Processor sinkProcessor = new DefaultProcessor(sinkNumber, "hbase_sink", config, HBaseSinkTask.class); - Partitioner shuffle = new ShufflePartitioner(); - Partitioner hash = new HashPartitioner(); + Partitioner shuffle = new ShufflePartitioner(); + Partitioner hash = new HashPartitioner(); - Graph graph = Graph.empty(); - graph.addVertex(sourceProcessor); - graph.addVertex(split); - graph.addVertex(sum); - graph.addVertex(sinkProcessor); + Graph graph = Graph.empty(); + graph.addVertex(sourceProcessor); + graph.addVertex(split); + graph.addVertex(sum); + graph.addVertex(sinkProcessor); - graph.addEdge(sourceProcessor, shuffle, split); - graph.addEdge(split, hash, sum); - graph.addEdge(sum, hash, sinkProcessor); + graph.addEdge(sourceProcessor, shuffle, split); + graph.addEdge(split, hash, sum); + graph.addEdge(sum, hash, sinkProcessor); - StreamApplication app = StreamApplication.apply("KafkaWordCount", graph, appConfig); + StreamApplication app = StreamApplication.apply("KafkaWordCount", graph, appConfig); - context.submit(app); + context.submit(app); - // clean resource - context.close(); - } + // clean resource + context.close(); + } } diff --git a/src/main/java/kafka/Split.java b/src/main/java/kafka/Split.java index 3a39259..aba319f 100644 --- a/src/main/java/kafka/Split.java +++ b/src/main/java/kafka/Split.java @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + package kafka; import org.apache.gearpump.Message; @@ -9,27 +28,29 @@ public class Split extends Task { - private TaskContext context; - private UserConfig userConf; - - public Split(TaskContext taskContext, UserConfig userConf) { - super(taskContext, userConf); - this.context = taskContext; - this.userConf = userConf; - } - - private Long now() { - return System.currentTimeMillis(); - } - - public void onStart(StartTime startTime) { - } - - public void onNext(Message message) { - String line = new String((byte[])(message.msg())); - String[] words = line.split("\\s+"); - for (int i = 0; i < words.length; i++) { - context.output(new Message(words[i], now())); - } + private TaskContext context; + private UserConfig userConf; + + public Split(TaskContext taskContext, UserConfig userConf) { + super(taskContext, userConf); + this.context = taskContext; + this.userConf = userConf; + } + + private Long now() { + return System.currentTimeMillis(); + } + + @Override + public void onStart(StartTime startTime) { + } + + @Override + public void onNext(Message message) { + String line = new String((byte[]) (message.msg())); + String[] words = line.split("\\s+"); + for (int i = 0; i < words.length; i++) { + context.output(new Message(words[i], now())); } + } } diff --git a/src/main/java/kafka/Sum.java b/src/main/java/kafka/Sum.java index 235fe15..c6e627c 100644 --- a/src/main/java/kafka/Sum.java +++ b/src/main/java/kafka/Sum.java @@ -14,6 +14,7 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * */ package kafka; diff --git a/src/main/java/kafka2kafka/ByteArray2StringTask.java b/src/main/java/kafka2kafka/ByteArray2StringTask.java new file mode 100644 index 0000000..558a3fc --- /dev/null +++ b/src/main/java/kafka2kafka/ByteArray2StringTask.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package kafka2kafka; + +import org.apache.gearpump.Message; +import org.apache.gearpump.cluster.UserConfig; +import org.apache.gearpump.streaming.task.StartTime; +import org.apache.gearpump.streaming.task.Task; +import org.apache.gearpump.streaming.task.TaskContext; +import org.slf4j.Logger; + +public class ByteArray2StringTask extends Task { + + private TaskContext context; + private UserConfig userConf; + + private Logger LOG = super.LOG(); + + public ByteArray2StringTask(TaskContext taskContext, UserConfig userConf) { + super(taskContext, userConf); + this.context = taskContext; + this.userConf = userConf; + } + + private Long now() { + return System.currentTimeMillis(); + } + + @Override + public void onStart(StartTime startTime) { + LOG.info("ByteArray2StringTask.onStart [" + startTime + "]"); + } + + /** + * Convert message payload to String if it is byte[]. Leave as is otherwise. + * + * @param messagePayLoad + */ + @Override + public void onNext(Message messagePayLoad) { + LOG.info("ByteArray2StringTask.onNext messagePayLoad = [" + messagePayLoad + "]"); + LOG.debug("message.msg class" + messagePayLoad.msg().getClass().getCanonicalName()); + + Object msg = messagePayLoad.msg(); + + if (msg instanceof byte[]) { + LOG.debug("converting to String."); + context.output(new Message(new String((byte[]) msg), now())); + } else { + LOG.debug("sending message as is."); + context.output(new Message(msg, now())); + } + } +} diff --git a/src/main/java/kafka2kafka/LogMessageTask.java b/src/main/java/kafka2kafka/LogMessageTask.java new file mode 100644 index 0000000..7f130d8 --- /dev/null +++ b/src/main/java/kafka2kafka/LogMessageTask.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package kafka2kafka; + +import org.apache.gearpump.Message; +import org.apache.gearpump.cluster.UserConfig; +import org.apache.gearpump.streaming.task.StartTime; +import org.apache.gearpump.streaming.task.Task; +import org.apache.gearpump.streaming.task.TaskContext; +import org.slf4j.Logger; + +public class LogMessageTask extends Task { + + private TaskContext context; + private UserConfig userConf; + + private Logger LOG = super.LOG(); + + public LogMessageTask(TaskContext taskContext, UserConfig userConf) { + super(taskContext, userConf); + this.context = taskContext; + this.userConf = userConf; + } + + private Long now() { + return System.currentTimeMillis(); + } + + @Override + public void onStart(StartTime startTime) { + LOG.info("LogMessageTask.onStart startTime [" + startTime + "]"); + } + + @Override + public void onNext(Message messagePayLoad) { + LOG.info("LogMessageTask.onNext messagePayLoad = [" + messagePayLoad + "]"); + context.output(new Message(messagePayLoad.msg(), now())); + } +} diff --git a/src/main/java/kafka2kafka/PipelineApp.java b/src/main/java/kafka2kafka/PipelineApp.java new file mode 100644 index 0000000..690a580 --- /dev/null +++ b/src/main/java/kafka2kafka/PipelineApp.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package kafka2kafka; + +import org.apache.gearpump.cluster.UserConfig; +import org.apache.gearpump.cluster.client.ClientContext; +import org.apache.gearpump.partitioner.HashPartitioner; +import org.apache.gearpump.partitioner.Partitioner; +import org.apache.gearpump.partitioner.ShufflePartitioner; +import org.apache.gearpump.streaming.Processor; +import org.apache.gearpump.streaming.Processor.DefaultProcessor; +import org.apache.gearpump.streaming.StreamApplication; +import org.apache.gearpump.streaming.kafka.KafkaSink; +import org.apache.gearpump.streaming.kafka.KafkaSource; +import org.apache.gearpump.streaming.sink.DataSinkProcessor; +import org.apache.gearpump.streaming.source.DataSourceProcessor; +import org.apache.gearpump.util.Graph; + +public class PipelineApp { + + public static void main(String[] args) { + ClientContext context = ClientContext.apply(); + UserConfig appConfig = UserConfig.empty(); + + int taskNumber = 1; + + // kafka source + KafkaSource kafkaSource = new KafkaSource("inputTopic", "localhost:2181"); + Processor sourceProcessor = DataSourceProcessor.apply(kafkaSource, taskNumber, "kafkaSource", + appConfig, context.system()); + + // converter (converts byte[] message to String -- kafka produces byte[]) + Processor convert2StringProcessor = new DefaultProcessor(taskNumber, "converter", null, ByteArray2StringTask.class); + + // converter (converts String message to scala.Tuple2 -- kafka sink needs it) + Processor convert2TupleProcessor = new DefaultProcessor(taskNumber, "converter", null, String2Tuple2Task.class); + + // simple processor (represents processing you would do on kafka messages stream; writes payload to logs) + Processor logProcessor = new DefaultProcessor(taskNumber, "forwarder", null, LogMessageTask.class); + + // kafka sink + KafkaSink kafkaSink = new KafkaSink("outputTopic", "localhost:9092"); + Processor sinkProcessor = DataSinkProcessor.apply(kafkaSink, taskNumber, "sink", appConfig, context.system()); + + Graph graph = Graph.empty(); + graph.addVertex(sourceProcessor); + graph.addVertex(convert2StringProcessor); + graph.addVertex(logProcessor); + graph.addVertex(convert2TupleProcessor); + graph.addVertex(sinkProcessor); + + Partitioner partitioner = new HashPartitioner(); + Partitioner shufflePartitioner = new ShufflePartitioner(); + + graph.addEdge(sourceProcessor, shufflePartitioner, convert2StringProcessor); + graph.addEdge(convert2StringProcessor, partitioner, logProcessor); + graph.addEdge(logProcessor, partitioner, convert2TupleProcessor); + graph.addEdge(convert2TupleProcessor, partitioner, sinkProcessor); + + // submit app + StreamApplication app = StreamApplication.apply("kafka2kafka", graph, appConfig); + context.submit(app); + + // clean resource + context.close(); + } +} diff --git a/src/main/java/kafka2kafka/String2Tuple2Task.java b/src/main/java/kafka2kafka/String2Tuple2Task.java new file mode 100644 index 0000000..b8f31fa --- /dev/null +++ b/src/main/java/kafka2kafka/String2Tuple2Task.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package kafka2kafka; + +import org.apache.gearpump.Message; +import org.apache.gearpump.cluster.UserConfig; +import org.apache.gearpump.streaming.task.StartTime; +import org.apache.gearpump.streaming.task.Task; +import org.apache.gearpump.streaming.task.TaskContext; +import org.slf4j.Logger; +import scala.Tuple2; + +import java.io.UnsupportedEncodingException; + +public class String2Tuple2Task extends Task { + + private TaskContext context; + private UserConfig userConf; + + private Logger LOG = super.LOG(); + + public String2Tuple2Task(TaskContext taskContext, UserConfig userConf) { + super(taskContext, userConf); + this.context = taskContext; + this.userConf = userConf; + } + + private Long now() { + return System.currentTimeMillis(); + } + + @Override + public void onStart(StartTime startTime) { + LOG.info("String2Tuple2Task.onStart [" + startTime + "]"); + } + + @Override + public void onNext(Message messagePayLoad) { + LOG.info("String2Tuple2Task.onNext messagePayLoad = [" + messagePayLoad + "]"); + + Object msg = messagePayLoad.msg(); + + byte[] key = null; + byte[] value = null; + try { + LOG.info("converting to Tuple2"); + key = "message".getBytes("UTF-8"); + value = ((String) msg).getBytes("UTF-8"); + Tuple2 tuple = new Tuple2(key, value); + context.output(new Message(tuple, now())); + } catch (UnsupportedEncodingException e) { + e.printStackTrace(); + LOG.info("sending message as is."); + context.output(new Message(msg, now())); + } + } +}