diff --git a/EtLast/Processes/StructuredBinaryTable/StructuredBinaryTableReader.cs b/EtLast/Processes/StructuredBinaryTable/StructuredBinaryTableReader.cs index bfb27208..d68c6a8b 100644 --- a/EtLast/Processes/StructuredBinaryTable/StructuredBinaryTableReader.cs +++ b/EtLast/Processes/StructuredBinaryTable/StructuredBinaryTableReader.cs @@ -1,4 +1,7 @@ -namespace FizzCode.EtLast; +using System.Buffers.Binary; +using System.IO.Compression; + +namespace FizzCode.EtLast; public sealed class StructuredBinaryTableReader : AbstractRowSource { @@ -37,6 +40,8 @@ protected override IEnumerable Produce() ? AddRowIndexToColumn : null; + var lenBuffer = new byte[4]; + var streamIndex = 0; foreach (var stream in streams) { @@ -46,52 +51,101 @@ protected override IEnumerable Produce() if (FlowState.IsTerminating) break; - BinaryReader reader = null; try { - reader = new BinaryReader(stream.Stream, Encoding.UTF8, leaveOpen: true); + int columnCount; + string[] columnNames; + string[] columnTypeNames; + BinaryTypeCode[] columnTypeCodes; - var formatVersion = reader.Read7BitEncodedInt(); + stream.Stream.ReadExactly(lenBuffer, 0, 4); + var len = BinaryPrimitives.ReadInt32LittleEndian(lenBuffer); + var buffer = new byte[len]; + stream.Stream.ReadExactly(buffer, 0, len); - var columnCount = reader.Read7BitEncodedInt(); - var columnNames = new string[columnCount]; - var columnTypeNames = new string[columnCount]; - var columnTypeCodes = new BinaryTypeCode[columnCount]; - for (var i = 0; i < columnCount; i++) + using (var headerUncompressed = new MemoryStream()) { - columnNames[i] = reader.ReadString(); - columnTypeNames[i] = reader.ReadString(); - columnTypeCodes[i] = (BinaryTypeCode)reader.ReadByte(); + using (var bufferMs = new MemoryStream(buffer)) + { + using (var ds = new DeflateStream(bufferMs, CompressionMode.Decompress, leaveOpen: true)) + { + ds.CopyTo(headerUncompressed); + } + } + + headerUncompressed.Position = 0; + + var headerReader = new BinaryReader(headerUncompressed, Encoding.UTF8, leaveOpen: false); + + var formatVersion = headerReader.Read7BitEncodedInt(); + + columnCount = headerReader.Read7BitEncodedInt(); + columnNames = new string[columnCount]; + columnTypeNames = new string[columnCount]; + columnTypeCodes = new BinaryTypeCode[columnCount]; + for (var i = 0; i < columnCount; i++) + { + columnNames[i] = headerReader.ReadString(); + columnTypeNames[i] = headerReader.ReadString(); + columnTypeCodes[i] = (BinaryTypeCode)headerReader.ReadByte(); + } + + headerReader.Dispose(); } while (stream.Stream.Position < stream.Stream.Length) { - for (var i = 0; i < columnCount; i++) + stream.Stream.ReadExactly(lenBuffer, 0, 4); + len = BinaryPrimitives.ReadInt32LittleEndian(lenBuffer); + buffer = new byte[len]; + stream.Stream.ReadExactly(buffer, 0, len); + + using (var ums = new MemoryStream()) { - var typeCode = (BinaryTypeCode)reader.ReadByte(); - if (typeCode != BinaryTypeCode._null) + using (var bufferMs = new MemoryStream(buffer)) + using (var ds = new DeflateStream(bufferMs, CompressionMode.Decompress, leaveOpen: true)) + ds.CopyTo(ums); + + ums.Position = 0; + + var reader = new BinaryReader(ums, Encoding.UTF8, leaveOpen: false); + try { - var value = BinaryTypeCodeEncoder.Read(reader, typeCode); - initialValues[columnNames[i]] = value; + while (ums.Position < ums.Length) + { + for (var i = 0; i < columnCount; i++) + { + var typeCode = (BinaryTypeCode)reader.ReadByte(); + if (typeCode != BinaryTypeCode._null) + { + var value = BinaryTypeCodeEncoder.Read(reader, typeCode); + initialValues[columnNames[i]] = value; + } + else + { + initialValues[columnNames[i]] = null; + } + } + + if (addStreamIndex != null) + initialValues[addStreamIndex] = streamIndex; + + if (addRowIndex != null) + initialValues[addRowIndex] = rowCount; + + rowCount++; + yield return Context.CreateRow(this, initialValues); + initialValues.Clear(); + + if (FlowState.IsTerminating) + break; + } } - else + finally { - initialValues[columnNames[i]] = null; + reader.Dispose(); } } - - if (addStreamIndex != null) - initialValues[addStreamIndex] = streamIndex; - - if (addRowIndex != null) - initialValues[addRowIndex] = rowCount; - - rowCount++; - yield return Context.CreateRow(this, initialValues); - initialValues.Clear(); - - if (FlowState.IsTerminating) - break; } } finally @@ -101,7 +155,6 @@ protected override IEnumerable Produce() stream.IoCommand.AffectedDataCount += rowCount; stream.IoCommand.End(); stream.Close(); - reader?.Dispose(); } } diff --git a/EtLast/Processes/StructuredBinaryTable/WriteToStructuredBinaryTableMutator.cs b/EtLast/Processes/StructuredBinaryTable/WriteToStructuredBinaryTableMutator.cs index b4df0b6b..8f1f9847 100644 --- a/EtLast/Processes/StructuredBinaryTable/WriteToStructuredBinaryTableMutator.cs +++ b/EtLast/Processes/StructuredBinaryTable/WriteToStructuredBinaryTableMutator.cs @@ -1,4 +1,7 @@ -namespace FizzCode.EtLast; +using System.Buffers.Binary; +using System.IO.Compression; + +namespace FizzCode.EtLast; public sealed class WriteToStructuredBinaryTableMutator : AbstractMutator, IRowSink { @@ -8,7 +11,7 @@ public sealed class WriteToStructuredBinaryTableMutator : AbstractMutator, IRowS /// /// Default value is 10000 /// - public int BatchSize { get; init; } = 10000; + public int BufferSizeBytes { get; init; } = 10000; private SinkEntry _sinkEntry; @@ -114,7 +117,7 @@ protected override IEnumerable MutateRow(IRow row, long rowInputIndex) sinkEntry.RowCount++; - if (sinkEntry.RowCount >= BatchSize) + if (sinkEntry.Buffer.Length >= BufferSizeBytes) WriteBuffer(); } catch (Exception ex) @@ -129,14 +132,29 @@ protected override IEnumerable MutateRow(IRow row, long rowInputIndex) private void WriteBuffer() { - if (_sinkEntry.RowCount == 0) + if (_sinkEntry.Buffer.Position == 0) return; _sinkEntry.BufferWriter.Flush(); - var data = _sinkEntry.Buffer.ToArray(); - _sinkEntry.NamedSink.Stream.Write(data, 0, data.Length); - _sinkEntry.NamedSink.Sink.IncreaseBytes(data.Length); + using (var targetMs = new MemoryStream()) + { + using (var ds = new DeflateStream(targetMs, CompressionLevel.Fastest)) + { + _sinkEntry.Buffer.Position = 0; + _sinkEntry.Buffer.CopyTo(ds); + } + + var data = targetMs.ToArray(); + + var buffer = new byte[4]; + BinaryPrimitives.WriteInt32LittleEndian(buffer, data.Length); + _sinkEntry.NamedSink.Stream.Write(buffer); + + _sinkEntry.NamedSink.Stream.Write(data, 0, data.Length); + _sinkEntry.NamedSink.Sink.IncreaseBytes(data.Length); + } + _sinkEntry.RowCount = 0; _sinkEntry.Buffer.SetLength(0); } diff --git a/Tests/EtLast.Tests.Unit/Tests/StructuredBinaryTable/StructuredBinaryTableTests.cs b/Tests/EtLast.Tests.Unit/Tests/StructuredBinaryTable/StructuredBinaryTableTests.cs index 0e3121b4..08910d59 100644 --- a/Tests/EtLast.Tests.Unit/Tests/StructuredBinaryTable/StructuredBinaryTableTests.cs +++ b/Tests/EtLast.Tests.Unit/Tests/StructuredBinaryTable/StructuredBinaryTableTests.cs @@ -146,8 +146,6 @@ public void WriteThenReadBackTest_RandomValuesTest() [TestMethod] public void StreamDisposeTest() { - var memoryStream = new MemoryStream(); - var context = TestExecuter.GetContext(); var rows = TestData.Person().TakeRowsAndReleaseOwnership(null); @@ -199,4 +197,67 @@ public void StreamDisposeTest() File.Delete(fileName); } + + [TestMethod] + public void EndToEndTest() + { + var context = TestExecuter.GetContext(); + + var rows = TestData.Person().TakeRowsAndReleaseOwnership(null); + + var rowCache = SequenceBuilder.Fluent + .ReadFrom(TestData.Person()) + .BuildToInMemoryRowCache(); + + var fileName = Path.GetTempFileName(); + + SequenceBuilder.Fluent + .ReadFromInMemoryRowCache(rowCache) + .WriteToStructuredBinaryTable(new WriteToStructuredBinaryTableMutator() + { + DynamicColumns = () => new() + { + ["id"] = typeof(int), + ["name"] = typeof(string), + ["age"] = typeof(int), + ["height"] = typeof(int), + ["eyeColor"] = typeof(string), + ["countryId"] = typeof(int), + ["birthDate"] = typeof(DateTime), + ["lastChangedTime"] = typeof(DateTime), + }, + SinkProvider = new LocalFileSinkProvider() + { + Path = fileName, + ActionWhenFileExists = LocalSinkFileExistsAction.Overwrite, + FileMode = FileMode.Create, + }, + }) + .Build() + .Execute(context); + + context = TestExecuter.GetContext(); + var builder = SequenceBuilder.Fluent + .ReadStructuredBinaryTable(new StructuredBinaryTableReader() + { + StreamProvider = new LocalFileStreamProvider() + { + Path = fileName, + }, + }); + + var result = TestExecuter.Execute(context, builder); + Assert.AreEqual(7, result.MutatedRows.Count); + Assert.That.ExactMatch(result.MutatedRows, [ + new() { ["id"] = 0, ["name"] = "A", ["age"] = 17, ["height"] = 160, ["eyeColor"] = "brown", ["countryId"] = 1, ["birthDate"] = new DateTime(2010, 12, 9, 0, 0, 0, 0), ["lastChangedTime"] = new DateTime(2015, 12, 19, 12, 0, 1, 0) }, + new() { ["id"] = 1, ["name"] = "B", ["age"] = 8, ["height"] = 190, ["eyeColor"] = null, ["countryId"] = 1, ["birthDate"] = new DateTime(2011, 2, 1, 0, 0, 0, 0), ["lastChangedTime"] = new DateTime(2015, 12, 19, 13, 2, 0, 0) }, + new() { ["id"] = 2, ["name"] = "C", ["age"] = 27, ["height"] = 170, ["eyeColor"] = "green", ["countryId"] = 2, ["birthDate"] = new DateTime(2014, 1, 21, 0, 0, 0, 0), ["lastChangedTime"] = new DateTime(2015, 11, 21, 17, 11, 58, 0) }, + new() { ["id"] = 3, ["name"] = "D", ["age"] = 39, ["height"] = 160, ["eyeColor"] = "fake", ["countryId"] = null, ["birthDate"] = "2018.07.11", ["lastChangedTime"] = new DateTime(2017, 8, 1, 4, 9, 1, 0) }, + new() { ["id"] = 4, ["name"] = "E", ["age"] = -3, ["height"] = 160, ["eyeColor"] = null, ["countryId"] = 1, ["birthDate"] = null, ["lastChangedTime"] = new DateTime(2019, 1, 1, 23, 59, 59, 0) }, + new() { ["id"] = 5, ["name"] = "A", ["age"] = 11, ["height"] = 140, ["eyeColor"] = null, ["countryId"] = null, ["birthDate"] = new DateTime(2013, 5, 15, 0, 0, 0, 0), ["lastChangedTime"] = new DateTime(2018, 1, 1, 0, 0, 0, 0) }, + new() { ["id"] = 6, ["name"] = "fake", ["age"] = null, ["height"] = 140, ["eyeColor"] = null, ["countryId"] = 5, ["birthDate"] = new DateTime(2018, 1, 9, 0, 0, 0, 0), ["lastChangedTime"] = null }]); + Assert.AreEqual(0, result.Process.FlowState.Exceptions.Count); + + File.Delete(fileName); + } } \ No newline at end of file