From 4774be88dac3dd783021740f035a18374c31cdfa Mon Sep 17 00:00:00 2001 From: venkateshDT Date: Mon, 27 Feb 2017 01:50:21 -0800 Subject: [PATCH] Regex Parser example --- tutorials/parser/README.md | 16 +++ .../regexparser/RegexParserApplication.java | 30 ++++++ .../tutorial/regexparser/ServerLog.java | 99 +++++++++++++++++++ .../regexparser/ServerLogGenerator.java | 42 ++++++++ .../properties-regexParserApplication.xml | 70 +++++++++++++ .../RegexParserApplicationTest.java | 77 +++++++++++++++ .../properties-regexParserApplication.xml | 68 +++++++++++++ 7 files changed, 402 insertions(+) create mode 100644 tutorials/parser/src/main/java/com/datatorrent/tutorial/regexparser/RegexParserApplication.java create mode 100644 tutorials/parser/src/main/java/com/datatorrent/tutorial/regexparser/ServerLog.java create mode 100644 tutorials/parser/src/main/java/com/datatorrent/tutorial/regexparser/ServerLogGenerator.java create mode 100644 tutorials/parser/src/site/conf/properties-regexParserApplication.xml create mode 100644 tutorials/parser/src/test/java/com/datatorrent/tutorial/regexparser/RegexParserApplicationTest.java create mode 100644 tutorials/parser/src/test/resources/properties-regexParserApplication.xml diff --git a/tutorials/parser/README.md b/tutorials/parser/README.md index da230e3..b7f930a 100644 --- a/tutorials/parser/README.md +++ b/tutorials/parser/README.md @@ -35,3 +35,19 @@ the commandline using `apex` cli script. **Step 3**: During launch use `src/main/resources/META-INF/properties-xmlParseApplication.xml` as a custom configuration file; then verify that the output by checking hdfs file path configured in properties-xmlParseApplication.xml + +* **RegexParser App** + +This application showcases how to use [RegexParser](https://datatorrent.com/docs/apidocs/com/datatorrent/contrib/parser/RegexParser.html) from [Apex Malhar](https://github.com/apache/apex-malhar) library. + +Follow these steps to run this application: + +**Step 1**: Build the code: + + shell> mvn clean install + +**Step 2**: Upload the `target/parser-1.0-SNAPSHOT.apa` to the UI console if available or launch it from +the commandline using `apex` cli script. + +**Step 3**: During launch use `properties-regexParserApplication.xml` as a custom configuration file; then verify +that the output by checking hdfs file path configured in properties-regexParserApplication.xml \ No newline at end of file diff --git a/tutorials/parser/src/main/java/com/datatorrent/tutorial/regexparser/RegexParserApplication.java b/tutorials/parser/src/main/java/com/datatorrent/tutorial/regexparser/RegexParserApplication.java new file mode 100644 index 0000000..4085520 --- /dev/null +++ b/tutorials/parser/src/main/java/com/datatorrent/tutorial/regexparser/RegexParserApplication.java @@ -0,0 +1,30 @@ +package com.datatorrent.tutorial.regexparser; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.contrib.parser.RegexParser; +import com.datatorrent.lib.appdata.schemas.SchemaUtils; +import com.datatorrent.lib.io.ConsoleOutputOperator; +import com.datatorrent.tutorial.csvparser.FileOutputOperator; + +@ApplicationAnnotation(name = "RegexParser") +public class RegexParserApplication implements StreamingApplication +{ + @Override + public void populateDAG(DAG dag, Configuration conf) + { + ServerLogGenerator logGenerator = dag.addOperator("logGenerator", ServerLogGenerator.class); + RegexParser regexParser = dag.addOperator("regexParser", RegexParser.class); + dag.setOutputPortAttribute(regexParser.out, Context.PortContext.TUPLE_CLASS, ServerLog.class); + FileOutputOperator regexWriter = dag.addOperator("regexWriter", FileOutputOperator.class); + FileOutputOperator regexErrorWriter = dag.addOperator("regexErrorWriter", FileOutputOperator.class); + + dag.addStream("regexInput", logGenerator.outputPort, regexParser.in); + dag.addStream("regexOutput", regexParser.out, regexWriter.input); + dag.addStream("regexError", regexParser.err, regexErrorWriter.input); + } +} diff --git a/tutorials/parser/src/main/java/com/datatorrent/tutorial/regexparser/ServerLog.java b/tutorials/parser/src/main/java/com/datatorrent/tutorial/regexparser/ServerLog.java new file mode 100644 index 0000000..1c37ca1 --- /dev/null +++ b/tutorials/parser/src/main/java/com/datatorrent/tutorial/regexparser/ServerLog.java @@ -0,0 +1,99 @@ +package com.datatorrent.tutorial.regexparser; + +import java.util.Date; + +public class ServerLog +{ + private Date date; + private int id; + private String signInId; + private String ipAddress; + private String serviceId; + private String accountId; + private String platform; + + + public int getId() + { + return id; + } + + public void setId(int id) + { + this.id = id; + } + + public Date getDate() + { + return date; + } + + public void setDate(Date date) + { + this.date = date; + } + + public String getSignInId() + { + return signInId; + } + + public void setSignInId(String signInId) + { + this.signInId = signInId; + } + + public String getIpAddress() + { + return ipAddress; + } + + public void setIpAddress(String ipAddress) + { + this.ipAddress = ipAddress; + } + + public String getServiceId() + { + return serviceId; + } + + public void setServiceId(String serviceId) + { + this.serviceId = serviceId; + } + + public String getAccountId() + { + return accountId; + } + + public void setAccountId(String accountId) + { + this.accountId = accountId; + } + + public String getPlatform() + { + return platform; + } + + public void setPlatform(String platform) + { + this.platform = platform; + } + + @Override + public String toString() + { + return "ServerLog{" + + "date=" + date + + ", id=" + id + + ", signInId='" + signInId + '\'' + + ", ipAddress='" + ipAddress + '\'' + + ", serviceId='" + serviceId + '\'' + + ", accountId='" + accountId + '\'' + + ", platform='" + platform + '\'' + + '}'; + } +} \ No newline at end of file diff --git a/tutorials/parser/src/main/java/com/datatorrent/tutorial/regexparser/ServerLogGenerator.java b/tutorials/parser/src/main/java/com/datatorrent/tutorial/regexparser/ServerLogGenerator.java new file mode 100644 index 0000000..6364925 --- /dev/null +++ b/tutorials/parser/src/main/java/com/datatorrent/tutorial/regexparser/ServerLogGenerator.java @@ -0,0 +1,42 @@ +package com.datatorrent.tutorial.regexparser; + +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.common.util.BaseOperator; + +public class ServerLogGenerator extends BaseOperator implements InputOperator +{ + public transient DefaultOutputPort outputPort = new DefaultOutputPort(); + private int tupleRate = 10; + private transient int tuplesEmmitedinWindow = 0; + public int getTupleRate() + { + return tupleRate; + } + + public void setTupleRate(int tupleRate) + { + this.tupleRate = tupleRate; + } + + @Override + public void emitTuples() + { + + while (tuplesEmmitedinWindow < tupleRate) { + String line = "2015-10-01T03:14:49.000-07:00 lvn-d1-dev DevServer[9876]: INFO: [EVENT][SEQ=248717]" + + " 2015:10:01:03:14:49 101 sign-in_id=11111@psop.com ip_address=1.1.1.1 service_id=IP1234-NPB12345_00 " + + "result=RESULT_SUCCESconsole_id=0000000138e91b4e58236bf32besdafasdfasdfasdfsadf account_id=11111 platform=pik"; + outputPort.emit(line.getBytes()); + tuplesEmmitedinWindow++; + } + + } + + @Override + public void endWindow() + { + tuplesEmmitedinWindow = 0; + super.endWindow(); + } +} diff --git a/tutorials/parser/src/site/conf/properties-regexParserApplication.xml b/tutorials/parser/src/site/conf/properties-regexParserApplication.xml new file mode 100644 index 0000000..f759cf3 --- /dev/null +++ b/tutorials/parser/src/site/conf/properties-regexParserApplication.xml @@ -0,0 +1,70 @@ + + + + + dt.application.RegexParser.operator.regexParser.prop.schema + { + "fields": [ + { + "name": "date", + "type": "Date", + "constraints": { + "format": "yyyy:MM:dd:hh:mm:ss" + } + }, + { + "name": "id", + "type": "Integer" + }, + { + "name": "signInId", + "type": "String" + + }, + { + "name": "ipAddress", + "type": "String" + }, + { + "name": "serviceId", + "type": "String" + }, + { + "name": "accountId", + "type": "String" + }, + { + "name": "platform", + "type": "String" + } + ] + } + + + + + + + + dt.application.RegexParser.operator.regexParser.prop.splitRegexPattern + .+\[SEQ=\w+\]\s*(\d+:[\d\d:]+)\s(\d+)\s* sign-in_id=(\S+) .*ip_address=(\S+).* service_id=(\S+).*account_id=(\S+).*platform=(\S+) + + + + dt.application.RegexParser.operator.*.prop.filePath + /tmp/application/parser/regexparser + + + + dt.application.RegexParser.operator.regexErrorWriter.prop.outputFileName + errordata + + + + dt.application.RegexParser.operator.regexWriter.prop.outputFileName + outputdata + + diff --git a/tutorials/parser/src/test/java/com/datatorrent/tutorial/regexparser/RegexParserApplicationTest.java b/tutorials/parser/src/test/java/com/datatorrent/tutorial/regexparser/RegexParserApplicationTest.java new file mode 100644 index 0000000..2174bac --- /dev/null +++ b/tutorials/parser/src/test/java/com/datatorrent/tutorial/regexparser/RegexParserApplicationTest.java @@ -0,0 +1,77 @@ +package com.datatorrent.tutorial.regexparser; + +import java.io.File; +import java.io.IOException; +import java.util.Collection; +import java.util.concurrent.Callable; + +import javax.validation.ConstraintViolationException; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.filefilter.WildcardFileFilter; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.LocalMode; +import com.datatorrent.stram.StramLocalCluster; + +public class RegexParserApplicationTest +{ + + @Test + public void testApplication() throws IOException, Exception + { + try { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + conf.addResource(this.getClass().getResourceAsStream("/properties-regexParserApplication.xml")); + conf.setLong("dt.application.RegexParser.operator.logGenerator.prop.tupleRate", 10); + final String dataFolderPath = conf.get("dt.application.RegexParser.operator.*.prop.filePath"); + final String dataFileName = conf + .get("dt.application.RegexParser.operator.regexWriter.prop.outputFileName"); + + FileUtils.deleteDirectory(new File(dataFolderPath)); + lma.prepareDAG(new RegexParserApplication(), conf); + LocalMode.Controller lc = lma.getController(); + lc.setHeartbeatMonitoringEnabled(false); + ((StramLocalCluster)lc).setExitCondition(new Callable() + { + @Override + public Boolean call() throws Exception + { + if (new File(dataFolderPath).exists()) { + Collection files = FileUtils.listFiles(new File(dataFolderPath), new WildcardFileFilter(dataFileName + + "*"), null); + if (files.size() >= 1) { + File parsedFile = files.iterator().next(); + String fileData = FileUtils.readFileToString(parsedFile); + String[] regexData = fileData.split("\n"); + return regexData.length == 10; + } + } + return false; + } + }); + + lc.run(30 * 1000); // runs for 30 seconds and quits + + Collection files = FileUtils.listFiles(new File(dataFolderPath), + new WildcardFileFilter(dataFileName + "*"), null); + File parsedFile = files.iterator().next(); + String fileData = FileUtils.readFileToString(parsedFile); + String[] logData = fileData.split("\n"); + for (String logLine : logData) { + System.out.println(logLine); + Assert.assertTrue(logLine.contains("id=" + 101)); + Assert.assertTrue(logLine.contains("signInId=" + "'11111@psop.com'")); + Assert.assertTrue(logLine.contains("serviceId=" + "'IP1234-NPB12345_00'")); + Assert.assertTrue(logLine.contains("accountId=" + "'11111'")); + Assert.assertTrue(logLine.contains("platform=" + "'pik'")); + } + } catch (ConstraintViolationException e) { + Assert.fail("constraint violations: " + e.getConstraintViolations()); + } + } +} diff --git a/tutorials/parser/src/test/resources/properties-regexParserApplication.xml b/tutorials/parser/src/test/resources/properties-regexParserApplication.xml new file mode 100644 index 0000000..4091002 --- /dev/null +++ b/tutorials/parser/src/test/resources/properties-regexParserApplication.xml @@ -0,0 +1,68 @@ + + + + + dt.application.RegexParser.operator.regexParser.prop.schema + { + "fields": [ + { + "name": "date", + "type": "Date", + "constraints": { + "format": "yyyy:MM:dd:hh:mm:ss" + } + }, + { + "name": "id", + "type": "Integer" + }, + { + "name": "signInId", + "type": "String" + + }, + { + "name": "ipAddress", + "type": "String" + }, + { + "name": "serviceId", + "type": "String" + }, + { + "name": "accountId", + "type": "String" + }, + { + "name": "platform", + "type": "String" + } + ] + } + + + + dt.application.RegexParser.operator.regexParser.port.out.attr.TUPLE_CLASS + com.datatorrent.tutorial.regexparser.ServerLog + + + + dt.application.RegexParser.operator.regexParser.prop.splitRegexPattern + .+\[SEQ=\w+\]\s*(\d+:[\d\d:]+)\s(\d+)\s* sign-in_id=(\S+) .*ip_address=(\S+).* service_id=(\S+).*account_id=(\S+).*platform=(\S+) + + + + dt.application.RegexParser.operator.*.prop.filePath + /tmp/application/parser/regexparser + + + + dt.application.RegexParser.operator.regexErrorWriter.prop.outputFileName + errordata + + + + dt.application.RegexParser.operator.regexWriter.prop.outputFileName + outputdata + +