Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions tutorials/parser/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,19 @@ the commandline using `apex` cli script.

**Step 3**: During launch use `src/main/resources/META-INF/properties-xmlParseApplication.xml` as a custom configuration file; then verify
that the output by checking hdfs file path configured in properties-xmlParseApplication.xml

* **RegexParser App**

This application showcases how to use [RegexParser](https://datatorrent.com/docs/apidocs/com/datatorrent/contrib/parser/RegexParser.html) from [Apex Malhar](https://github.com/apache/apex-malhar) library.

Follow these steps to run this application:

**Step 1**: Build the code:

shell> mvn clean install

**Step 2**: Upload the `target/parser-1.0-SNAPSHOT.apa` to the UI console if available or launch it from
the commandline using `apex` cli script.

**Step 3**: During launch use `properties-regexParserApplication.xml` as a custom configuration file; then verify
that the output by checking hdfs file path configured in properties-regexParserApplication.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.datatorrent.tutorial.regexparser;

import org.apache.hadoop.conf.Configuration;

import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.contrib.parser.RegexParser;
import com.datatorrent.lib.appdata.schemas.SchemaUtils;
import com.datatorrent.lib.io.ConsoleOutputOperator;
import com.datatorrent.tutorial.csvparser.FileOutputOperator;

@ApplicationAnnotation(name = "RegexParser")
public class RegexParserApplication implements StreamingApplication
{
@Override
public void populateDAG(DAG dag, Configuration conf)
{
ServerLogGenerator logGenerator = dag.addOperator("logGenerator", ServerLogGenerator.class);
RegexParser regexParser = dag.addOperator("regexParser", RegexParser.class);
dag.setOutputPortAttribute(regexParser.out, Context.PortContext.TUPLE_CLASS, ServerLog.class);
FileOutputOperator regexWriter = dag.addOperator("regexWriter", FileOutputOperator.class);
FileOutputOperator regexErrorWriter = dag.addOperator("regexErrorWriter", FileOutputOperator.class);

dag.addStream("regexInput", logGenerator.outputPort, regexParser.in);
dag.addStream("regexOutput", regexParser.out, regexWriter.input);
dag.addStream("regexError", regexParser.err, regexErrorWriter.input);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package com.datatorrent.tutorial.regexparser;

import java.util.Date;

public class ServerLog
{
private Date date;
private int id;
private String signInId;
private String ipAddress;
private String serviceId;
private String accountId;
private String platform;


public int getId()
{
return id;
}

public void setId(int id)
{
this.id = id;
}

public Date getDate()
{
return date;
}

public void setDate(Date date)
{
this.date = date;
}

public String getSignInId()
{
return signInId;
}

public void setSignInId(String signInId)
{
this.signInId = signInId;
}

public String getIpAddress()
{
return ipAddress;
}

public void setIpAddress(String ipAddress)
{
this.ipAddress = ipAddress;
}

public String getServiceId()
{
return serviceId;
}

public void setServiceId(String serviceId)
{
this.serviceId = serviceId;
}

public String getAccountId()
{
return accountId;
}

public void setAccountId(String accountId)
{
this.accountId = accountId;
}

public String getPlatform()
{
return platform;
}

public void setPlatform(String platform)
{
this.platform = platform;
}

@Override
public String toString()
{
return "ServerLog{" +
"date=" + date +
", id=" + id +
", signInId='" + signInId + '\'' +
", ipAddress='" + ipAddress + '\'' +
", serviceId='" + serviceId + '\'' +
", accountId='" + accountId + '\'' +
", platform='" + platform + '\'' +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.datatorrent.tutorial.regexparser;

import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;
import com.datatorrent.common.util.BaseOperator;

public class ServerLogGenerator extends BaseOperator implements InputOperator
{
public transient DefaultOutputPort<byte[]> outputPort = new DefaultOutputPort<byte[]>();
private int tupleRate = 10;
private transient int tuplesEmmitedinWindow = 0;
public int getTupleRate()
{
return tupleRate;
}

public void setTupleRate(int tupleRate)
{
this.tupleRate = tupleRate;
}

@Override
public void emitTuples()
{

while (tuplesEmmitedinWindow < tupleRate) {
String line = "2015-10-01T03:14:49.000-07:00 lvn-d1-dev DevServer[9876]: INFO: [EVENT][SEQ=248717]" +
" 2015:10:01:03:14:49 101 sign-in_id=11111@psop.com ip_address=1.1.1.1 service_id=IP1234-NPB12345_00 " +
"result=RESULT_SUCCESconsole_id=0000000138e91b4e58236bf32besdafasdfasdfasdfsadf account_id=11111 platform=pik";
outputPort.emit(line.getBytes());
tuplesEmmitedinWindow++;
}

}

@Override
public void endWindow()
{
tuplesEmmitedinWindow = 0;
super.endWindow();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
<?xml version="1.0"?>
<configuration>

<property>
<name>dt.application.RegexParser.operator.regexParser.prop.schema</name>
<value>{
"fields": [
{
"name": "date",
"type": "Date",
"constraints": {
"format": "yyyy:MM:dd:hh:mm:ss"
}
},
{
"name": "id",
"type": "Integer"
},
{
"name": "signInId",
"type": "String"

},
{
"name": "ipAddress",
"type": "String"
},
{
"name": "serviceId",
"type": "String"
},
{
"name": "accountId",
"type": "String"
},
{
"name": "platform",
"type": "String"
}
]
}
</value>
</property>

<!--The below property is set from RegexParserApplication.java. If it is set in both the places, the below value in property overrides the value set in RegexParserApplication.java -->
<!-- <property>
<name>dt.application.RegexParser.operator.regexParser.port.out.attr.TUPLE_CLASS</name>
<value>com.datatorrent.tutorial.regexparser.ServerLog</value>
</property>-->

<property>
<name>dt.application.RegexParser.operator.regexParser.prop.splitRegexPattern</name>
<value>.+\[SEQ=\w+\]\s*(\d+:[\d\d:]+)\s(\d+)\s* sign-in_id=(\S+) .*ip_address=(\S+).* service_id=(\S+).*account_id=(\S+).*platform=(\S+)</value>
</property>

<property>
<name>dt.application.RegexParser.operator.*.prop.filePath</name>
<value>/tmp/application/parser/regexparser</value>
</property>

<property>
<name>dt.application.RegexParser.operator.regexErrorWriter.prop.outputFileName</name>
<value>errordata</value>
</property>

<property>
<name>dt.application.RegexParser.operator.regexWriter.prop.outputFileName</name>
<value>outputdata</value>
</property>
</configuration>
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package com.datatorrent.tutorial.regexparser;

import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.Callable;

import javax.validation.ConstraintViolationException;

import org.junit.Assert;
import org.junit.Test;

import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.WildcardFileFilter;
import org.apache.hadoop.conf.Configuration;

import com.datatorrent.api.LocalMode;
import com.datatorrent.stram.StramLocalCluster;

public class RegexParserApplicationTest
{

@Test
public void testApplication() throws IOException, Exception
{
try {
LocalMode lma = LocalMode.newInstance();
Configuration conf = new Configuration(false);
conf.addResource(this.getClass().getResourceAsStream("/properties-regexParserApplication.xml"));
conf.setLong("dt.application.RegexParser.operator.logGenerator.prop.tupleRate", 10);
final String dataFolderPath = conf.get("dt.application.RegexParser.operator.*.prop.filePath");
final String dataFileName = conf
.get("dt.application.RegexParser.operator.regexWriter.prop.outputFileName");

FileUtils.deleteDirectory(new File(dataFolderPath));
lma.prepareDAG(new RegexParserApplication(), conf);
LocalMode.Controller lc = lma.getController();
lc.setHeartbeatMonitoringEnabled(false);
((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
{
@Override
public Boolean call() throws Exception
{
if (new File(dataFolderPath).exists()) {
Collection<File> files = FileUtils.listFiles(new File(dataFolderPath), new WildcardFileFilter(dataFileName
+ "*"), null);
if (files.size() >= 1) {
File parsedFile = files.iterator().next();
String fileData = FileUtils.readFileToString(parsedFile);
String[] regexData = fileData.split("\n");
return regexData.length == 10;
}
}
return false;
}
});

lc.run(30 * 1000); // runs for 30 seconds and quits

Collection<File> files = FileUtils.listFiles(new File(dataFolderPath),
new WildcardFileFilter(dataFileName + "*"), null);
File parsedFile = files.iterator().next();
String fileData = FileUtils.readFileToString(parsedFile);
String[] logData = fileData.split("\n");
for (String logLine : logData) {
System.out.println(logLine);
Assert.assertTrue(logLine.contains("id=" + 101));
Assert.assertTrue(logLine.contains("signInId=" + "'11111@psop.com'"));
Assert.assertTrue(logLine.contains("serviceId=" + "'IP1234-NPB12345_00'"));
Assert.assertTrue(logLine.contains("accountId=" + "'11111'"));
Assert.assertTrue(logLine.contains("platform=" + "'pik'"));
}
} catch (ConstraintViolationException e) {
Assert.fail("constraint violations: " + e.getConstraintViolations());
}
}
}
Loading