Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for exporting data in row batch mode #759

Merged
merged 6 commits into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/Infrastructure/AppEnvironment.cs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ static AppEnvironment()
MsalTokenCacheFilePath = Path.Combine(ApplicationDataPath, ".msalcache");
WebView2VersionInfo = WebView2Helper.GetRuntimeVersionInfo();

Diagnostics = new ConcurrentDictionary<string, DiagnosticMessage>();
Diagnostics = new ConcurrentDictionary<DiagnosticMessage, DiagnosticMessage>();
DefaultJsonOptions = new(JsonSerializerDefaults.Web) { MaxDepth = 32 }; // see Microsoft.AspNetCore.Mvc.JsonOptions.JsonSerializerOptions

var spaceChars = new[]
Expand Down Expand Up @@ -184,7 +184,7 @@ public static RegistryKey? ApplicationInstallerRegistryHKey

public static bool IsDiagnosticLevelVerbose => UserPreferences.Current.DiagnosticLevel == DiagnosticLevelType.Verbose;

public static ConcurrentDictionary<string, DiagnosticMessage> Diagnostics { get; }
public static ConcurrentDictionary<DiagnosticMessage, DiagnosticMessage> Diagnostics { get; }

public static void AddDiagnostics(string name, Exception exception, DiagnosticMessageSeverity severity = DiagnosticMessageSeverity.Error)
{
Expand All @@ -195,8 +195,8 @@ public static void AddDiagnostics(string name, Exception exception, DiagnosticMe
public static void AddDiagnostics(DiagnosticMessageType type, string name, string content, DiagnosticMessageSeverity severity = DiagnosticMessageSeverity.None, bool writeFile = false)
{
var message = DiagnosticMessage.Create(type, severity, name, content);

Diagnostics.AddOrUpdate(message.Name!, message, (key, value) => message);
_= Diagnostics.TryAdd(message, message);

if (writeFile)
{
Expand Down
19 changes: 19 additions & 0 deletions src/Infrastructure/Extensions/DataReaderExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
namespace Sqlbi.Bravo.Infrastructure.Extensions
{
using System.Collections.Generic;
using System;
using System.Data;

internal static class DataReaderExtensions
{
public static IEnumerable<T> Select<T>(this IDataReader reader, Func<IDataReader, T> selector)
{
while (reader.Read())
{
yield return selector(reader);
}

reader.Close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,8 @@
using System.Data;
using System.Linq;

internal static class LinqExtensions
internal static class EnumerableExtensions
{
public static IEnumerable<T> Select<T>(this IDataReader reader, Func<IDataReader, T> selector)
{
while (reader.Read())
{
yield return selector(reader);
}

reader.Close();
}

public static IEnumerable<(T item, int index)> WithIndex<T>(this IEnumerable<T> source)
{
return source.Select((item, index) => (item, index));
Expand Down
5 changes: 5 additions & 0 deletions src/Infrastructure/Services/ConnectionWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,15 @@ private AdomdConnectionWrapper(string connectionString, string databaseName)
Connection = new AdomdConnection(connectionString.ToUnprotectedString());
ProcessHelper.RunOnUISynchronizationContext(() => Connection.Open());
Connection.ChangeDatabase(databaseName);

// TOPNSKIP is supported since Analysis Services 2016 version 13 despite being an undocumented feature (not included in MDSCHEMA_FUNCTIONS)
IsDaxFunctionTopNSkipSupported = Version.TryParse(Connection.ServerVersion, out var version) && version >= new Version(13, 0);
}

public AdomdConnection Connection { get; }

public bool IsDaxFunctionTopNSkipSupported { get; }

public AdomdCommand CreateAdomdCommand()
{
var command = Connection.CreateCommand();
Expand Down
2 changes: 1 addition & 1 deletion src/Models/DiagnosticMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class DiagnosticMessage
[JsonIgnore]
public DateTime? ReadTimestamp { get; set; }

public static DiagnosticMessage Create(DiagnosticMessageType type, DiagnosticMessageSeverity severity, string name, string content)
internal static DiagnosticMessage Create(DiagnosticMessageType type, DiagnosticMessageSeverity severity, string name, string content)
{
var message = new DiagnosticMessage
{
Expand Down
3 changes: 3 additions & 0 deletions src/Models/ExportData/ExportDataEntity.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ public class ExportDataTable : ExportDataEntity
[JsonPropertyName("rows")]
public int Rows { get; set; } = 0;

[JsonPropertyName("columns")]
public int Columns { get; set; } = 0;

public void SetTruncated() => Status = ExportDataStatus.Truncated;
}

Expand Down
168 changes: 120 additions & 48 deletions src/Services/ExportDataService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using CsvHelper.Configuration;
using CsvHelper.TypeConversion;
using LargeXlsx;
using Microsoft.AnalysisServices.AdomdClient;
using Sqlbi.Bravo.Infrastructure;
using Sqlbi.Bravo.Infrastructure.Extensions;
using Sqlbi.Bravo.Infrastructure.Helpers;
Expand Down Expand Up @@ -37,6 +38,9 @@ public interface IExportDataService

internal class ExportDataService : IExportDataService
{
private const int BatchSize = 10_000;
private const int ExcelMaxRows = 1_000_000; // Excel cannot exceed the limit of 1,048,576 rows and 16,384 columns

private static readonly TypeConverterOptions _defaultDelimitedTextTypeConverterOptions = new()
{
Formats = new[]
Expand Down Expand Up @@ -196,51 +200,79 @@ private static void ExportDelimitedTextFileImpl(ExportDataJob job, ExportDelimit
cancellationToken.ThrowIfCancellationRequested();

var table = job.AddNew(tableName);
{
// TODO: if the SSAS instance supports TOPNSKIP then use that to query batches of rows
command.CommandText = $"EVALUATE { TabularModelHelper.GetDaxTableName(tableName) }";
var fileTableName = tableName.ReplaceInvalidFileNameChars();
var fileName = Path.ChangeExtension(fileTableName, "csv");
var path = Path.Combine(settings.ExportPath, fileName);

Encoding encoding = settings.UnicodeEncoding
? new UnicodeEncoding()
: new UTF8Encoding(encoderShouldEmitUTF8Identifier: false);

using var streamWriter = new StreamWriter(path, append: false, encoding);
using var csvWriter = new CsvWriter(streamWriter, config);

Export(command, table, csvWriter, settings.QuoteStringFields, rowBatchMode: connection.IsDaxFunctionTopNSkipSupported, cancellationToken);
}

var fileTableName = tableName.ReplaceInvalidFileNameChars();
var fileName = Path.ChangeExtension(fileTableName, "csv");
var path = Path.Combine(settings.ExportPath, fileName);
static void Export(AdomdCommand command, ExportDataTable table, CsvWriter writer, bool quoteStringFields, bool rowBatchMode, CancellationToken cancellationToken)
{
var tableName = TabularModelHelper.GetDaxTableName(table.Name);

Encoding encoding = settings.UnicodeEncoding
? new UnicodeEncoding()
: new UTF8Encoding(encoderShouldEmitUTF8Identifier: false);
if (AppEnvironment.IsDiagnosticLevelVerbose)
AppEnvironment.AddDiagnostics(DiagnosticMessageType.Text, name: $"{nameof(ExportDataService)}.{nameof(ExportDelimitedTextFileImpl)}.{nameof(Export)}", content: $"Export table {tableName} in {(rowBatchMode ? "row batch" : "full")} mode");

using var streamWriter = new StreamWriter(path, append: false, encoding);
using var csvWriter = new CsvWriter(streamWriter, config);
if (rowBatchMode)
{
var batchCount = 0;
do
{
// Sort order based on RowNumber column. Order changes when the table is refreshed.
command.CommandText = $"EVALUATE TOPNSKIP({BatchSize}, {batchCount++ * BatchSize}, {tableName})";
using var dataReader = command.ExecuteReader(CommandBehavior.SingleResult);

if (batchCount == 1) WriteHeader(table, writer, dataReader, quoteStringFields);
var batchRows = WriteData(table, writer, dataReader, quoteStringFields, cancellationToken);
if (batchRows == 0)
break;
}
while (true);
}
else
{
command.CommandText = $"EVALUATE {tableName}";
using var dataReader = command.ExecuteReader(CommandBehavior.SingleResult);
//using var dataReader = CreateTestDataReader();

WriteData(table, csvWriter, dataReader, settings.QuoteStringFields, cancellationToken);
WriteHeader(table, writer, dataReader, quoteStringFields);
WriteData(table, writer, dataReader, quoteStringFields, cancellationToken);
}

table.SetCompleted();
}

static void WriteData(ExportDataTable table, CsvWriter writer, IDataReader reader, bool quoteStringFields, CancellationToken cancellationToken)
static void WriteHeader(ExportDataTable table, CsvWriter writer, IDataReader reader, bool quoteStringFields)
{
// output dates using ISO 8601 format
writer.Context.TypeConverterOptionsCache.AddOptions(typeof(DateTime), options: _defaultDelimitedTextTypeConverterOptions);
table.Columns = reader.FieldCount;

// write header
for (int i = 0; i < reader.FieldCount; i++)
for (var i = 0; i < reader.FieldCount; i++)
{
var stringField = GetDaxColumnName(reader, i);
var columnName = GetDaxColumnName(reader, i);

if (quoteStringFields)
writer.WriteField(stringField, shouldQuote: true);
writer.WriteField(columnName, shouldQuote: true);
else
writer.WriteField(stringField); // use default ConfigurationFunctions.ShouldQuote()
writer.WriteField(columnName); // use default ConfigurationFunctions.ShouldQuote()
}

writer.NextRecord();
}

// write data
static int WriteData(ExportDataTable table, CsvWriter writer, IDataReader reader, bool quoteStringFields, CancellationToken cancellationToken)
{
var rows = 0;
while (reader.Read())
{
cancellationToken.ThrowIfCancellationRequested();

for (var i = 0; i < reader.FieldCount; i++)
{
var field = reader[i];
Expand All @@ -260,9 +292,14 @@ static void WriteData(ExportDataTable table, CsvWriter writer, IDataReader reade
}
}

rows++;
table.Rows++;
writer.NextRecord();

if (table.Rows % 1_000 == 0)
cancellationToken.ThrowIfCancellationRequested();
}
return rows;
}
}

Expand All @@ -284,20 +321,11 @@ private static void ExportExcelFileImpl(ExportDataJob job, ExportExcelSettings s
cancellationToken.ThrowIfCancellationRequested();

var table = job.AddNew(tableName);
{
// TODO: if the SSAS instance supports TOPNSKIP then use that to query batches of rows
command.CommandText = $"EVALUATE { TabularModelHelper.GetDaxTableName(tableName) }";

using var dataReader = command.ExecuteReader(CommandBehavior.SingleResult);
//using var dataReader = CreateTestDataReader();
var worksheetName = GetWorksheetName(tableName, tableIndex);

xlsxWriter.BeginWorksheet(worksheetName, splitRow: 1);
{
WriteData(table, xlsxWriter, dataReader, cancellationToken);
}
xlsxWriter.SetAutoFilter(fromRow: 1, fromColumn: 1, rowCount: xlsxWriter.CurrentRowNumber, columnCount: dataReader.FieldCount);
}
var worksheetName = GetWorksheetName(tableName, tableIndex);

xlsxWriter.BeginWorksheet(worksheetName, splitRow: 1);
Export(command, table, xlsxWriter, rowBatchMode: connection.IsDaxFunctionTopNSkipSupported, cancellationToken);
xlsxWriter.SetAutoFilter(fromRow: 1, fromColumn: 1, rowCount: table.Rows, columnCount: table.Columns);
}

WriteSummary(job, settings, xlsxWriter);
Expand Down Expand Up @@ -354,7 +382,43 @@ static string GetWorksheetName(string tableName, int tableIndex)
return worksheetName;
}

static void WriteData(ExportDataTable table, XlsxWriter writer, IDataReader reader, CancellationToken cancellationToken)
static void Export(AdomdCommand command, ExportDataTable table, XlsxWriter writer, bool rowBatchMode, CancellationToken cancellationToken)
{
var tableName = TabularModelHelper.GetDaxTableName(table.Name);

if (AppEnvironment.IsDiagnosticLevelVerbose)
AppEnvironment.AddDiagnostics(DiagnosticMessageType.Text, name: $"{nameof(ExportDataService)}.{nameof(ExportExcelFileImpl)}.{nameof(Export)}", content: $"Export table {tableName} in {(rowBatchMode ? "row batch" : "full")} mode");

if (rowBatchMode)
{
var batchCount = 0;
do
{
// Sort order based on RowNumber column. Order changes when the table is refreshed.
command.CommandText = $"EVALUATE TOPNSKIP({BatchSize}, {batchCount++ * BatchSize}, {tableName})";
using var reader = command.ExecuteReader(CommandBehavior.SingleResult);

if (batchCount == 1) WriteHeader(table, writer, reader);
var batchRows = WriteData(table, writer, reader, cancellationToken);
if (batchRows == 0 || table.Status == ExportDataStatus.Truncated)
break;
}
while (true);
}
else
{
command.CommandText = $"EVALUATE {tableName}";
using var reader = command.ExecuteReader(CommandBehavior.SingleResult);

WriteHeader(table, writer, reader);
WriteData(table, writer, reader, cancellationToken);
}

if (table.Status == ExportDataStatus.Running)
table.SetCompleted();
}

static void WriteHeader(ExportDataTable table, XlsxWriter writer, IDataReader reader)
{
// TODO: improve the column format(XlsxStyle) by using the TOM.Column.FormatString property (see DaxStudio.UI.Utils.XlsxHelper.GetStyle)

Expand All @@ -364,23 +428,27 @@ static void WriteData(ExportDataTable table, XlsxWriter writer, IDataReader read
border: XlsxStyle.Default.Border,
numberFormat: XlsxStyle.Default.NumberFormat,
alignment: XlsxAlignment.Default);
var dateTimeStyle = XlsxStyle.Default.With(new XlsxNumberFormat($"yyyy-mm-dd hh:mm:ss"));

// write header
writer.SetDefaultStyle(headerStyle).BeginRow();
table.Columns = reader.FieldCount;

for (var i = 0; i < reader.FieldCount; i++)
{
var daxColumnName = GetDaxColumnName(reader, i);
writer.Write(daxColumnName);
var columnName = GetDaxColumnName(reader, i);
writer.Write(columnName);
}

// write data
// restore default style
writer.SetDefaultStyle(XlsxStyle.Default);
}

static int WriteData(ExportDataTable table, XlsxWriter writer, IDataReader reader, CancellationToken cancellationToken)
{
var dateTimeStyle = XlsxStyle.Default.With(new XlsxNumberFormat($"yyyy-mm-dd hh:mm:ss"));
var rows = 0;

while (reader.Read())
{
cancellationToken.ThrowIfCancellationRequested();
writer.BeginRow();

for (var i = 0; i < reader.FieldCount; i++)
Expand Down Expand Up @@ -419,15 +487,19 @@ static void WriteData(ExportDataTable table, XlsxWriter writer, IDataReader read
}
}

if (++table.Rows >= 1_000_000)
{
// Break and exit if we have reached the limit of an xlsx file
rows++;

if (++table.Rows >= ExcelMaxRows)
{
table.SetTruncated();
return;
return rows;
}

if (table.Rows % 1_000 == 0)
cancellationToken.ThrowIfCancellationRequested();
}

table.SetCompleted();
return rows;
}

static void WriteSummary(ExportDataJob job, ExportExcelSettings settings, XlsxWriter writer)
Expand Down
Loading