Skip to content

Commit

Permalink
Introducing kafka->kafka example
Browse files Browse the repository at this point in the history
  • Loading branch information
karol-brejna-i committed Jul 16, 2015
1 parent 37729e0 commit f68c538
Show file tree
Hide file tree
Showing 10 changed files with 543 additions and 104 deletions.
126 changes: 110 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,30 @@
How to Build
=============
mvn package
This repository holds some common gearpump usage patterns with Java.

After build, there is a jar under `target/streaming-java-template-1.1-SNAPSHOT.jar`.
The examples include:
* word count -- simple Java app that shows the structure of gearpump app
* Kafka -> Kafka pipeline -- very simple example that shows how to read and write from Kafka topics
* Kafka -> HBase pipeline -- how to read from Kafka topic, how to write to HBase

How to run
============
1. Start the gearpump cluster (0.3.8-SNAPSHOT)
The following sections will give you information about:
* How to build and run examples
* How specific example works

# Building and running the examples

The repository is organized in one maven project that contains all the examples.


## Build

To build the examples run:

`mvn package`

After build, there is a jar under `target/streaming-java-template-$VERSION.jar`.

## Running an example

1. Start the gearpump cluster (0.4)

a) Download from http://www.gearpump.io/site/downloads/

Expand All @@ -20,24 +38,100 @@ How to run
bin/services
```

2. Submit the wordcount-jar
2. Submit the jar
```bash
bin/gear app -jar path/to/streaming-java-template-$VERSION.jar <app mainclass with package>
```

for example:

```bash
bin/gear app -jar path/to/streaming-java-template-1.1-SNAPSHOT.jar javatemplate.WordCount
bin/gear app -jar target/streaming-java-template-$VERSION.jar javatemplate.WordCount
```


3. Check the UI
http://127.0.0.1:8090/


NOTE:


> NOTE:
>
> Please use Java7 to run the cluster.
>
> You can set the ENV JAVA_HOME.

> On windows:
> set JAVA_HOME={path_to_java_7}
>
> On Linux
> export JAVA_HOME={path_to_java_7}

# Examples description

## kafka2kafka-pipeline
Very simple example that shows how to read and write from Kafka topics.

The example makes use of Gearpump Connector API, `KafkaSource` and `KafkaSink`, that make simple operations with Kafka super easy.


When defining Kafka source, you'll need to provide topic name and zookeeper location:
```Java
KafkaSource kafkaSource = new KafkaSource("inputTopic", "localhost:2181");
```
Please use Java7 to run the cluster.
You can set the ENV JAVA_HOME.
When defining Kafka sink (output), you will just give the destination topic name and Kafka broker address:
```Java
KafkaSink kafkaSink = new KafkaSink("outputTopic", "localhost:9092");
```
On windows:
set JAVA_HOME={path_to_java_7}
Keep in mind, that Kafka source processor produces message as byte array (`byte[]`).
Also, Kafka sink processor expects the message to be scala.Tuple.
on Linux
export JAVA_HOME={path_to_java_7}
The example shows dedicated steps that do the necessary conversions.
(The conversions don't need to be a separate step, you could include them in other task that do actual computation.)

### Dependencies
This example uses zookeeper and Kafka. You need to set them up before running.

Start zookeeper and Kafka:

```bash
zookeeper/bin/zkServer.sh start
kafka/bin/kafka-server-start.sh kafka/config/server.properties
```

(Tested with zookeeper 3.4.6 and Kafka 2.11-0.8.2.1. with default settings.)

The app will read messages from `inputTopic` and write to `outputTopic`, so you may need to create them beforehand:

```bash
kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic inputTopic
kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic outputTopic
```

### Testing
After you prepared Kafka topics and deployed the app to gearpump cluster, you can start using it.

Start producing some messages to input topic:

```bash
kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic inputTopic
```


Check if anything appears on output topic:

```bash
kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic outputTopic --from-beginning
```

The tasks write to application logs, so you can browse them to see execution flow.

The logs should be under location similar to this:
```
$GEARPUMP_HOME/logs/applicationData/<<user>>/<<date>>/<<appN>>/
```
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<artifactId>streaming-java-template</artifactId>
<version>1.1-SNAPSHOT</version>
<properties>
<gearpumpVersion>0.3.8-SNAPSHOT</gearpumpVersion>
<gearpumpVersion>0.4</gearpumpVersion>
</properties>


Expand Down Expand Up @@ -66,7 +66,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<version>3.2</version>
<configuration>
<source>1.6</source>
<target>1.6</target>
Expand All @@ -76,7 +76,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<version>2.6</version>
<version>2.7</version>
<configuration>
<outputDirectory>${project.build.outputDirectory}/resources</outputDirectory>
</configuration>
Expand Down
71 changes: 45 additions & 26 deletions src/main/java/kafka/HBaseSinkTask.java
Original file line number Diff line number Diff line change
@@ -1,3 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package kafka;

import org.apache.gearpump.Message;
Expand All @@ -10,40 +29,40 @@

public class HBaseSinkTask extends Task {

public static String TABLE_NAME = "hbase.table.name";
public static String COLUMN_FAMILY = "hbase.table.column.family";
public static String COLUMN_NAME = "hbase.table.column.name";
public static String ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum";
public static String TABLE_NAME = "hbase.table.name";
public static String COLUMN_FAMILY = "hbase.table.column.family";
public static String COLUMN_NAME = "hbase.table.column.name";
public static String ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum";


private HBaseSink sink;
private String columnFamily;
private HBaseSink sink;
private String columnFamily;

public HBaseSinkTask(TaskContext taskContext, UserConfig userConf) {
super(taskContext, userConf);
public HBaseSinkTask(TaskContext taskContext, UserConfig userConf) {
super(taskContext, userConf);

String tableName = userConf.getString(TABLE_NAME).get();
String zkQuorum = userConf.getString(ZOOKEEPER_QUORUM).get();
String tableName = userConf.getString(TABLE_NAME).get();
String zkQuorum = userConf.getString(ZOOKEEPER_QUORUM).get();

columnFamily = userConf.getString(COLUMN_FAMILY).get();
columnFamily = userConf.getString(COLUMN_FAMILY).get();

Configuration hbaseConf = new Configuration();
hbaseConf.set(ZOOKEEPER_QUORUM, zkQuorum);
Configuration hbaseConf = new Configuration();
hbaseConf.set(ZOOKEEPER_QUORUM, zkQuorum);


sink = new HBaseSink(tableName, hbaseConf);
}
sink = new HBaseSink(tableName, hbaseConf);
}

@Override
public void onStart(StartTime startTime) {
//skip
}
@Override
public void onStart(StartTime startTime) {
//skip
}

@Override
public void onNext(Message message) {
String[] wordcount = ((String) message.msg()).split(":");
String word = wordcount[0];
String count = wordcount[1];
sink.insert(message.timestamp() + "", columnFamily, word, count);
}
@Override
public void onNext(Message message) {
String[] wordcount = ((String) message.msg()).split(":");
String word = wordcount[0];
String count = wordcount[1];
sink.insert(message.timestamp() + "", columnFamily, word, count);
}
}
93 changes: 56 additions & 37 deletions src/main/java/kafka/KafkaWordCount.java
Original file line number Diff line number Diff line change
@@ -1,3 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package kafka;


Expand All @@ -15,55 +34,55 @@

public class KafkaWordCount {

public static void main(String[] args) {
public static void main(String[] args) {

ClientContext context = ClientContext.apply();
ClientContext context = ClientContext.apply();

UserConfig appConfig = UserConfig.empty();
UserConfig appConfig = UserConfig.empty();

// we will create two kafka reader task to read from kafka queue.
int sourceNum = 2;
// please create "topic1" on kafka and produce some data to it
KafkaSource source = new KafkaSource("topic1", "localhost:2181");
Processor sourceProcessor = DataSourceProcessor.apply(source, sourceNum, "kafka_source", appConfig, context.system());
// we will create two kafka reader task to read from kafka queue.
int sourceNum = 2;
// please create "topic1" on kafka and produce some data to it
KafkaSource source = new KafkaSource("topic1", "localhost:2181");
Processor sourceProcessor = DataSourceProcessor.apply(source, sourceNum, "kafka_source", appConfig, context.system());

// For split task, we config to create two tasks
int splitTaskNumber = 2;
Processor split = new DefaultProcessor(splitTaskNumber, "split", null, Split.class);
// For split task, we config to create two tasks
int splitTaskNumber = 2;
Processor split = new DefaultProcessor(splitTaskNumber, "split", null, Split.class);

// sum task
int sumTaskNumber = 2;
Processor sum = new DefaultProcessor(sumTaskNumber, "sum", null, Sum.class);
// sum task
int sumTaskNumber = 2;
Processor sum = new DefaultProcessor(sumTaskNumber, "sum", null, Sum.class);

// hbase sink
int sinkNumber = 2;
// please create HBase table "pipeline" and column family "wordcount"
UserConfig config = UserConfig.empty()
.withString(HBaseSinkTask.ZOOKEEPER_QUORUM, "localhost:2181")
.withString(HBaseSinkTask.TABLE_NAME, "pipeline")
.withString(HBaseSinkTask.COLUMN_FAMILY, "wordcount");
Processor sinkProcessor = new DefaultProcessor(sinkNumber, "hbase_sink", config, HBaseSinkTask.class);
// hbase sink
int sinkNumber = 2;
// please create HBase table "pipeline" and column family "wordcount"
UserConfig config =
UserConfig.empty().withString(HBaseSinkTask.ZOOKEEPER_QUORUM, "localhost:2181")
.withString(HBaseSinkTask.TABLE_NAME, "pipeline")
.withString(HBaseSinkTask.COLUMN_FAMILY, "wordcount");
Processor sinkProcessor = new DefaultProcessor(sinkNumber, "hbase_sink", config, HBaseSinkTask.class);


Partitioner shuffle = new ShufflePartitioner();
Partitioner hash = new HashPartitioner();
Partitioner shuffle = new ShufflePartitioner();
Partitioner hash = new HashPartitioner();

Graph graph = Graph.empty();
graph.addVertex(sourceProcessor);
graph.addVertex(split);
graph.addVertex(sum);
graph.addVertex(sinkProcessor);
Graph graph = Graph.empty();
graph.addVertex(sourceProcessor);
graph.addVertex(split);
graph.addVertex(sum);
graph.addVertex(sinkProcessor);

graph.addEdge(sourceProcessor, shuffle, split);
graph.addEdge(split, hash, sum);
graph.addEdge(sum, hash, sinkProcessor);
graph.addEdge(sourceProcessor, shuffle, split);
graph.addEdge(split, hash, sum);
graph.addEdge(sum, hash, sinkProcessor);

StreamApplication app = StreamApplication.apply("KafkaWordCount", graph, appConfig);
StreamApplication app = StreamApplication.apply("KafkaWordCount", graph, appConfig);

context.submit(app);
context.submit(app);

// clean resource
context.close();
}
// clean resource
context.close();
}
}

Loading

0 comments on commit f68c538

Please sign in to comment.