Skip to content

Two flavors of IDataReader implementations used together with SqlBulkCopy to insert static and dynamic data structures into SQL Server table.

Notifications You must be signed in to change notification settings

yuramag/BulkInsertDemo

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

5 Commits
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Using SqlBulkCopy with IDataReader for Hi Perf Inserts

Introduction

Eventually, SqlBulkCopy can be used in three flavors: inserting data represented as a DataTable object, array of DataRow objects, or IDataReader instance. In this article, I will demonstrate two implementations of IDataReader interface used in conjunction with SqlBulkCopy for high performance database inserts. The other two options are similar to each other and can be used for relatively small amounts of data because they require all records to be pre-loaded into memory before handing them over to SqlBulkCopy. In contrast, the IDataReader approach is more flexible and allows working with unlimited number of records in "lazy" mode, meaning that data can be fed to the SqlBulkCopy on the fly as fast as a server can consume it. This is analogous to IList<T> vs IEnumerable<T> approach.

Using the Demo

The attached demo project consists of a pre-compiled console application with config file and Data sub-folder containing sample CSV file. Before running the demo, make sure to adjust config file specifying correct connection string named "DefaultDb". Another setting "MaxRecordCount" is equal to 100,000 by default, which should be OK for this demo. Note that the connection string can point to any existing database. All demo tables will be created automatically, so there is no need to set up the database manually.

After launching the demo, it will show up in a console window asking to press Enter before initializing the database and before executing every demo action.

As a first step, the application will attempt to initialize the database. It will create (or recreate) three tables - one for each demo action:

  1. Contacts table with Id, FirstName, LastName, and BirthDate columns
  2. DynamicData table with an Id, 10 integer, 10 string, 10 datetime, and 10 guid columns
  3. CsvData table having the same structure as DynamicData

Then the app will execute three demo actions measuring time for each action:

  1. Static Dataset Demo demonstrates ObjectDataReader<T> that allows to process instances of any POCO class (Contact class in this case).
  2. Dynamic Dataset Demo demonstrates DynamicDataReader<T> that also implements IDataReader, but allows user to decide how to extract data from the underlying object of T through the user defined lambda expression. In this demo, I use IDictionary<string, object> to represent the data.
  3. CSV Import Demo utilizes CsvParser class and above-mentioned DynamicDataReader<T> to efficiently load attached "Data\CsvData.csv" file into the database.
CsvParser implementation is described in one of my other articles here.

The data for the first two demos is randomly generated on the fly using helper class RandomDataGenerator. Another helper class TableSchemaProvider is used to extract some metadata from SQL Server and execute some utility SQL commands.

ObjectDataReader<T>

As shown below, ObjectDataReader<T> accepts IEnumerable<T> in its constructor, which represents the stream of actual data to be consumed by SqlBulkCopy class. It is important to note that GetOrdinal() and GetValue() methods do not use reflection every time they need to access properties of T. Instead, they use pre-compiled and cached lambda expressions that play the role of property accessors and lookups. These pre-compiled lambda expressions are many times faster than using reflection.

public sealed class ObjectDataReader<TData> : IDataReader
{
    private class PropertyAccessor
    {
        public List<Func<TData, object>> Accessors { get; set; }
        public Dictionary<string, int> Lookup { get; set; }
    }

    private static readonly Lazy<PropertyAccessor> s_propertyAccessorCache =
        new Lazy<PropertyAccessor>(() =>
    {
        var propertyAccessors = typeof(TData)
            .GetProperties(BindingFlags.Instance | BindingFlags.Public)
            .Where(p => p.CanRead)
            .Select((p, i) => new
            {
                Index = i,
                Property = p,
                Accessor = CreatePropertyAccessor(p)
            })
            .ToArray();

        return new PropertyAccessor
        {
            Accessors = propertyAccessors.Select(p => p.Accessor).ToList(),
            Lookup = propertyAccessors.ToDictionary(
                p => p.Property.Name, p => p.Index, StringComparer.OrdinalIgnoreCase)
        };
    });

    private static Func<TData, object> CreatePropertyAccessor(PropertyInfo p)
    {
        var parameter = Expression.Parameter(typeof(TData), "input");
        var propertyAccess = Expression.Property(parameter, p.GetGetMethod());
        var castAsObject = Expression.TypeAs(propertyAccess, typeof(object));
        var lamda = Expression.Lambda<Func<TData, object>>(castAsObject, parameter);
        return lamda.Compile();
    }

    private IEnumerator<TData> m_dataEnumerator;

    public ObjectDataReader(IEnumerable<TData> data)
    {
        m_dataEnumerator = data.GetEnumerator();
    }

    #region IDataReader Members

    public void Close()
    {
        Dispose();
    }

    public int Depth => 1;

    public DataTable GetSchemaTable()
    {
        return null;
    }

    public bool IsClosed => m_dataEnumerator == null;

    public bool NextResult()
    {
        return false;
    }

    public bool Read()
    {
        if (IsClosed)
            throw new ObjectDisposedException(GetType().Name);
        return m_dataEnumerator.MoveNext();
    }

    public int RecordsAffected => -1;

    #endregion

    // IDisposable Members

    #region IDataRecord Members

    public int GetOrdinal(string name)
    {
        int ordinal;
        if (!s_propertyAccessorCache.Value.Lookup.TryGetValue(name, out ordinal))
            throw new InvalidOperationException("Unknown parameter name: " + name);
        return ordinal;
    }

    public object GetValue(int i)
    {
        if (m_dataEnumerator == null)
            throw new ObjectDisposedException(GetType().Name);
        return s_propertyAccessorCache.Value.Accessors[i](m_dataEnumerator.Current);
    }

    public int FieldCount => s_propertyAccessorCache.Value.Accessors.Count;

    // Not Implemented IDataRecord Members ...
        
    #endregion
}

Once we have ObjectDataReader<T> implemented, we can plug it into SqlBulkCopy as follows:

private static async Task RunStaticDatasetDemoAsync(SqlConnection connection, int count, 
    CancellationToken cancellationToken)
{
    using (var bulkCopy = new SqlBulkCopy(connection))
    {
        bulkCopy.DestinationTableName = "Contacts";
        bulkCopy.BatchSize = 1000;
        bulkCopy.BulkCopyTimeout = (int) TimeSpan.FromMinutes(10).TotalSeconds;

        bulkCopy.ColumnMappings.Add("Id", "Id");
        bulkCopy.ColumnMappings.Add("FirstName", "FirstName");
        bulkCopy.ColumnMappings.Add("LastName", "LastName");
        bulkCopy.ColumnMappings.Add("BirthDate", "BirthDate");

        using (var reader = new ObjectDataReader<Contact>(new RandomDataGenerator().GetContacts(count)))
            await bulkCopy.WriteToServerAsync(reader, cancellationToken);
    }
}

DynamicDataReader<T>

You can use DynamicDataReader<T> if there is no statically defined class that represents the data. The best example that illustrates the purpose of DynamicDataReader<T> is when every record of your table is represented as Dictionary<string, object> where the keys are column names. This way, if there is no value for a given column in the dictionary, Null value will be assumed. Conversely, all items in the dictionary that are not associated with any column in the table, will be ignored.

public sealed class DynamicDataReader<T> : IDataReader
{
    private readonly IList<SchemaFieldDef> m_schema;
    private readonly IDictionary<string, int> m_schemaMapping;
    private readonly Func<T, string, object> m_selector;
    private IEnumerator<T> m_dataEnumerator;

    public DynamicDataReader(IList<SchemaFieldDef> schema, IEnumerable<T> data, 
        Func<T, string, object> selector)
    {
        m_schema = schema;
        m_schemaMapping = m_schema
            .Select((x, i) => new { x.FieldName, Index = i })
            .ToDictionary(x => x.FieldName, x => x.Index);
        m_selector = selector;
        m_dataEnumerator = data.GetEnumerator();
    }

    #region IDataReader Members

    public void Close()
    {
        Dispose();
    }

    public int Depth => 1;

    public DataTable GetSchemaTable()
    {
        return null;
    }

    public bool IsClosed => m_dataEnumerator == null;

    public bool NextResult()
    {
        return false;
    }

    public bool Read()
    {
        if (IsClosed)
            throw new ObjectDisposedException(GetType().Name);
        return m_dataEnumerator.MoveNext();
    }

    public int RecordsAffected => -1;

    #endregion

    // IDisposable Members

    #region IDataRecord Members

    public int FieldCount => m_schema.Count;

    public int GetOrdinal(string name)
    {
        int ordinal;
        if (!m_schemaMapping.TryGetValue(name, out ordinal))
            throw new InvalidOperationException("Unknown parameter name: " + name);
        return ordinal;
    }

    public object GetValue(int i)
    {
        if (m_dataEnumerator == null)
            throw new ObjectDisposedException(GetType().Name);

        var value = m_selector(m_dataEnumerator.Current, m_schema[i].FieldName);

        if (value == null)
            return DBNull.Value;

        var strValue = value as string;
        if (strValue != null)
        {
            if (strValue.Length > m_schema[i].Size && m_schema[i].Size > 0)
                strValue = strValue.Substring(0, m_schema[i].Size);
            if (m_schema[i].DataType == DbType.String)
                return strValue;
            return SchemaFieldDef.StringToTypedValue(strValue, m_schema[i].DataType) ?? DBNull.Value;
        }

        return value;
    }

    // Not Implemented IDataRecord Members

    #endregion
}

DynamicDataReader<T> relays on SchemaFieldDef class that describes Field Name, Size, and DB Data Type of a table column. Only those columns that were passed via constructor (IList<SchemaFieldDef> schema) will participate in data inserts. The other two parameter of a constructor represent the data itself (IEnumerable<T> data), and the user defined lambda expression (Func<T, string, object> selector) to access properties. As you can see, selector accepts the instance of T and a string field name, and returns object back that represents the value associated with that field name. Note that object's data type can either be a non-string C# type (int, decimal, DateTime, Guid, etc.) corresponding to the actual type in database (int, numeric, datetime, uniqueidentifier, etc.), or simply a string. In latter case, DynamicDataReader will attempt to convert a string value into an appropriate data type automatically, with the help of SchemaFieldDef.StringToTypedValue() method. This method supports only a few data type, but can be easily extended if needed.

Here is an example of using DynamicDataReader<T> together with SqlBulkCopy:

private static async Task RunDynamicDatasetDemoAsync(SqlConnection connection, int count, 
    CancellationToken cancellationToken)
{
    var fields = await new TableSchemaProvider(connection, "DynamicData").GetFieldsAsync();

    using (var bulkCopy = new SqlBulkCopy(connection))
    {
        bulkCopy.DestinationTableName = "DynamicData";
        bulkCopy.BatchSize = 1000;
        bulkCopy.BulkCopyTimeout = (int) TimeSpan.FromMinutes(10).TotalSeconds;

        foreach (var field in fields)
            bulkCopy.ColumnMappings.Add(field.FieldName, field.FieldName);

        var data = new RandomDataGenerator().GetDynamicData(count);

        using (var reader = new DynamicDataReader<IDictionary<string, object>>
				(fields, data, (x, k) => x.GetValueOrDefault(k)))
            await bulkCopy.WriteToServerAsync(reader, cancellationToken);
    }
}

It is very similar to ObjectDataReader usage with the exception that the fields are not statically bound.

CSV File Import

Finally, the third demo action features CsvParser, DynamicDataReader, and SqlBulkCopy classes working together to achieve high performant and scalable data import in CSV format:

private static async Task RunCsvDatasetDemoAsync(SqlConnection connection, int count, 
    CancellationToken cancellationToken)
{
    using (var csvReader = new StreamReader(@"Data\CsvData.csv"))
    {
        var csvData = CsvParser.ParseHeadAndTail(csvReader, ',', '"');

        var csvHeader = csvData.Item1
            .Select((x, i) => new {Index = i, Field = x})
            .ToDictionary(x => x.Field, x => x.Index);

        var csvLines = csvData.Item2;

        var fields = await new TableSchemaProvider(connection, "CsvData").GetFieldsAsync();

        using (var bulkCopy = new SqlBulkCopy(connection))
        {
            bulkCopy.DestinationTableName = "CsvData";
            bulkCopy.BatchSize = 1000;
            bulkCopy.BulkCopyTimeout = (int) TimeSpan.FromMinutes(10).TotalSeconds;

            foreach (var field in fields)
                bulkCopy.ColumnMappings.Add(field.FieldName, field.FieldName);

            using (var reader = new DynamicDataReader<IList<string>>(fields, csvLines.Take(count),
                (x, k) => x.GetValueOrDefault(csvHeader.GetValueOrDefault(k, -1))))
            {
                await bulkCopy.WriteToServerAsync(reader, cancellationToken);
            }
        }
    }
}

For demo purposes, there are only 1,000 rows in the CsvData.csv file. This particular solution though will be able to handle any number of rows with relatively stable performance. It will match column names in the CSV file with column names of the target table. Missing data will be populated with Nulls. Any extra columns not existing in the target table will be ignored.

Summary

In this article, I demonstrated one of the possible ways of handling high perfomant database inserts using managed code. My goal was to construct a flexible and easy to use API, so it could be applied to many different scenarios. In particular, use ObjectDataReader<T> to upload data represented as statically defined POCO classes, and DynamicDataReader<T> to upload data of any structure.

About

Two flavors of IDataReader implementations used together with SqlBulkCopy to insert static and dynamic data structures into SQL Server table.

Resources

Stars

Watchers

Forks

Packages

No packages published

Languages