Saturday, November 14, 2009

A Generic BDP Approach for SQL Server 2008

As I promised in my previous post (SQL Server and Bulk Data Persistence (BDP), today I'll show a possible generic approach of how to integrate Bulk Data Persistence without writing a new enumerator for each domain object. In addition we will encapsulate the ID synchronization into a small framework.

If you did not yet read the previous posts, please read it before you goon here. I see you as a person who knows the idea of BDP and how to handle it with SQL Server 2008.

Our Task List

Let's reflect the steps of the previous post to figure out all tasks which have to be done:
  • We have to provide SqlMetaData objects to map our domain object properties
  • We have to provide an enumerator through the objects and create a SqlDataRecord
  • We have to call some T-SQL to do the persistence
  • We have to map all the new server ids to our domain objects
Sounds really complicated? It's not as hard as it sounds. It's more complicated than a command or simple strategy pattern but not much more than a usual Unit of Work pattern.

ObjectState Enumeration

Before we start with our tasks, we have to change a dirty non-OO cheat we previously used. We change the "pseudo enum" to represent the state of a domain object to a real enum ObjectState.
public enum ObjectState
{
   Added,
   Deleted,
   Changed,
   Unchanged,
}
Okay, back to our task list...

Column Mapping

The next thing we need is a general class to map the properties of a domain object into a SqlMetaData object. Above we can see a possible SqlMergeColumn<T> class which represents all information which are generally needed to create the meta data objects.
class SqlMergeColumn<T>
{
   internal SqlMergeColumn(string columnName, SqlDbType sqlType,
                           Func<T, object> getData) {
      ColumnName = columnName;
      SqlType = sqlType;
      _getData = getData;
   }

   // callback to get the value from a current item
   Func<T, object> _getData;
   
   public string ColumnName { get; private set; }
   public SqlDbType SqlType { get; private set; }
   public int MaxLength { get; internal set; }
   public byte Precision { get; internal set; }
   public byte Scale { get; internal set; }

   public virtual object GetData(T item) {
      // get the field value or DBNull
      return _getData(item) ?? DBNull.Value;
   }

   public SqlMetaData GetMetaData() {
      // get meta data for this property mapping
      if (MaxLength != 0)
         return new SqlMetaData(ColumnName, SqlType, MaxLength);
      if (Precision != 0)
         return new SqlMetaData(ColumnName, SqlType, Precision, Scale);
      return new SqlMetaData(ColumnName, SqlType);
   }
}
As you see, the larger part of the class is a simple representation of properties which are needed to create a SqlMetaData object. The MaxLength property is needed for data types with variable length like text or binary data-types. The Precision and Scale properties are required for some numeric types like decimal.

The GetData(T):object method gets an object of generic type of the class. (I always try to avoid reflection if possible, since it is quiet slow and always hard do maintain.) The GetData(T):object returns a value of the specified item by utilizing a delegate Func<T,TReturn> which was provided by the constructor of the class.

The last method GetMetaData():SqlMetaData is a typical strategy method which returns the meta data for a SqlDataRecord enumerator depending on the current configuration of an instance of the class.

We have to handle two special column types. First, we need the identity column to be able to join data for UPDATE and DELETE and synchronize the server identity values with our newly added objects. Second, we need column which can tell us the type of the BDP action (add, change, delete) to be executed. To stay OO geeky we use two additional classes which inherit from SqlMergeColumn<T>.
// represents an identity merge column
sealed class SqlMergeIdColumn<T> : SqlMergeColumn<T>
{
   public SqlMergeIdColumn(string columnName, Func<T, int> getIdData, 
                           Action<T, int> setIdData)
      : base(columnName, SqlDbType.Int, (t) => getIdData(t)) {
      _setIdData = setIdData;
   }

   // private fields
   Action<T, int> _setIdData;

   public int GetIdData(T item) {
      return (int)base.GetData(item);
   }

   public void SetIdData(T item, int id) {
      _setIdData(item, id);
   }
}

// represents a merge action column
sealed class SqlMergeActionColumn<T> : SqlMergeColumn<T>
{
   public SqlMergeActionColumn(string columnName, 
                               Func<T, ObjectState> getMergeAction)
      : base(columnName, SqlDbType.Char, (t) => getMergeAction(t)) {
      MaxLength = 1;
   }

   public override object GetData(T item) {
      switch (GetMergeAction(item)) {
         case ObjectState.Added: return 'I';
         case ObjectState.Changed: return 'U';
         case ObjectState.Deleted: return 'D';
         default: return 'X';
      }
   }

   public ObjectState GetMergeAction(T item) {
      return (ObjectState)base.GetData(item);
   }
}
The SqlMergeIdColumn<T> class provides two additional methods GetIdValue(T):int to return an System.Int32 id value and SetIdValue(T,int) to set a new id value after our database operation.

The SqlMergeActionColumn<T> overwrites the GetData(T):object method to transform the ObjectState enum into a System.Char for our data mapping.

GetMergeAction(T):ObjectState returns the enum value of a specified domain object.

Now, since we have column classes to specify a mapping to a table-valued parameter, we need a collection to store those mappings. I use a System.Collections.ObjectModel.Collection<T> inherited collection in this sample.
// represents a collection of SqlMergeColumn<T> objects
sealed class SqlMergeColumnCollection<T>
   : System.Collections.ObjectModel.Collection<SqlMergeColumn<T>>
{
   // add a data column
   public void AddDataColumn(string columnName, SqlDbType sqlType,
                             Func<T, object> getData) {
      this.AddDataColumn(columnName, sqlType, 0, getData);
   }

   // add a data column
   public void AddDataColumn(string columnName, SqlDbType sqlType,
                             int maxLength, Func<T, object> getData) {
      SqlMergeColumn<T> mapping = new SqlMergeColumn<T>(
         columnName, sqlType, getData);
      mapping.MaxLength = maxLength;
      base.Add(mapping);
   }

   // add a data column
   public void AddDataColumn(string columnName, SqlDbType sqlType,
                             byte precision, byte scale, 
                             Func<T, object> getData) {
      SqlMergeColumn<T> mapping = 
         new SqlMergeColumn<T>(columnName, sqlType, getData);
      mapping.Precision = precision;
      mapping.Scale = scale;
      base.Add(mapping);
   }

   // set the id column
   public void SetIdColumn(string columnName, 
                           Func<T, int> getIdData,
                           Action<T, int> setIdData) {
      base.Add(new SqlMergeIdColumn<T>(columnName, getIdData, setIdData));
   }

   // set the merge action column
   public void SetMergeColumn(string columnName, 
                              Func<T, ObjectState> getMergeAction) {
      base.Add(new SqlMergeActionColumn<T>(columnName, getMergeAction));
   }

   // get the id column
   internal SqlMergeIdColumn<T> GetIdColumn() {
      return this.OfType<SqlMergeIdColumn<T>>().First();
   }

   // get the merge column
   internal SqlMergeActionColumn<T> GetMergeColumn() {
      return this.OfType<SqlMergeActionColumn<T>>().First();
   }

   // get only the data columns
   internal IEnumerable<SqlMergeColumn<T>> GetDataColumns() {
      return this.Where(c => c.GetType() == typeof(SqlMergeColumn<T>));
   }
}
We've got three different implementations of AddDataColumn to add usual data column mappings. In addition we've got two methods (SetIdColumns and SetMergeColumn) to specify the special id and merge-action columns. At last we have three methods (GetIdColumn, GetMergeColumn and GetDataColumns) to access the different column types. I marked those methods as internal since they are only needed within the BDP framework and usually not important for a consumer of the framework.

Note, this framework is just a sample implementation. To keep it simple, I didn't add the possibility to specify the order of all columns. This means, currently all columns have to be specified in same order as defined in the table-valued type. Feel free to add an order parameter for all methods if you need this.

Data Enumerator

As we know, we have to provide a IEnumerable<SqlDataRecord> as value of a .NET SqlParameter which represents a table-valued parameter. The following listing shows a generic implementation of this class.
sealed class Sql2k8MergeEnumerable<T> : IEnumerable<SqlDataRecord>
{
   // generic class to handle field access by lambda expression
   // create a new enumerator
   internal Sql2k8MergeEnumerable(IEnumerable<T> data, 
                                  SqlMergeColumnCollection<T> mappings) {
      _data = data;
      _mappings = mappings;
   }

   // private fields
   IEnumerable<T> _data;
   SqlMergeColumnCollection<T> _mappings;

   // the enumerator which will be called from ADO.NET for a SqlParameter value
   public IEnumerator<SqlDataRecord> GetEnumerator() {
      // get all SqlMetaData for specified column mappings
      SqlMetaData[] metaData =
        _mappings.Select(mapping => mapping.GetMetaData()).ToArray();

      foreach (var item in _data) {
         // create a new record
         SqlDataRecord record = new SqlDataRecord(metaData);
         // fill the record with its values
         for (int i = 0; i < _mappings.Count; i++)
            record.SetValue(i, _mappings[i].GetData(item));
         // return the current record
         yield return record;
      }
   }

   IEnumerator IEnumerable.GetEnumerator() {
      return this.GetEnumerator();
   }
}
The class gets the data to iterate through and an instance of our above implemented merge column collection.

The GetEnumerator():IEnumerator<SqlDataRecord> method utilizes the column mappings collection to create the meta data objects we need for a data record. After this it iterates through all data and returns the data records. To fill the records it uses the GetData method we already defined on our column classes.

Merge Command

A quick look back to our task list shows to missing tasks. We need something to generate the persisting T-SQL and we have to sync the client surrogate ids to the server side generated identity values. (I added both functionalities into one class, feel free to change this for a lower coupling.)
class Sql2k8MergeCommand<T>
{
   public Sql2k8MergeCommand(string tableName, string tableTypeName, 
                             IEnumerable<T> data) {
      _tableName = tableName;
      _tableTypeName = tableTypeName;
      _data = data;
      Columns = new SqlMergeColumnCollection<T>();
   }

   // private fields
   string _tableName;
   string _tableTypeName;
   IEnumerable<T> _data;

   public SqlMergeColumnCollection<T> Columns { get; private set; }
   public SqlConnection Connection { get; set; }
   public SqlTransaction Transaction { get; set; }

   // execute the merge command
   public void Execute() {
      // create an SqlCommand
      SqlCommand cmd = new SqlCommand();
      cmd.Connection = Connection;
      cmd.Transaction = Transaction;
      // get the dynamic SQL
      cmd.CommandText = GetCommandText();
      // create a parameter which gets all data as table-valued parameter
      SqlParameter data = cmd.Parameters.Add("@data", SqlDbType.Structured);
      data.TypeName = _tableTypeName;
      data.Value = new Sql2k8MergeEnumerable<T>(_data, Columns);

      // execute the command and read the IDENTITY and INSERT row count
      using (SqlDataReader reader = cmd.ExecuteReader()) {
         SyncSurrogateIds(reader);
      }
   }

   // get the sql command text to be executed
   private string GetCommandText() {
      StringBuilder update = new StringBuilder(0x100);
      StringBuilder insert = new StringBuilder(0x100);

      // build columns for INSERT and UPDATE
      foreach (var column in Columns.GetDataColumns()) {
         insert.AppendFormat("{0},", column.ColumnName);
         update.AppendFormat("trg.{0}=src.{0},", column.ColumnName);
      }
      // remove the trailing ","
      insert.Remove(insert.Length - 1, 1);
      update.Remove(update.Length - 1, 1);

      string id = Columns.GetIdColumn().ColumnName;
      string merge = Columns.GetMergeColumn().ColumnName;
      StringWriter sql = new StringWriter(new StringBuilder(0x400));

      // delete
      sql.WriteLine("DELETE trg FROM {0} trg WHERE EXISTS (", _tableName);
      sql.WriteLine("   SELECT * FROM @data src");
      sql.WriteLine("   WHERE src.{0} = trg.{0} AND src.{1} = 'D');", 
         id, merge);
      // update
      sql.WriteLine("UPDATE trg SET {0} FROM {1} trg", update, _tableName);
      sql.WriteLine("   JOIN @data src ON trg.{0} = src.{0} AND src.{1} = 'U';",
         id, merge);
      // insert
      sql.WriteLine("INSERT INTO {0} ({1}) SELECT {1} FROM @data", 
         _tableName, insert);
      sql.WriteLine("   WHERE {0} = 'I' ORDER BY {1};", merge, id);
      // return identity information
      sql.WriteLine("SELECT SCOPE_IDENTITY(), @@ROWCOUNT;");

      return sql.ToString();
   }

   private void SyncSurrogateIds(SqlDataReader reader) {
      int lastId;
      int count;

      // get identity information from reader
      reader.Read();
      lastId = reader.IsDBNull(0) ? -1 : (int)reader.GetDecimal(0);
      count = reader.GetInt32(1);
      // no new rows added
      if (count == 0)
         return;
      
      SqlMergeActionColumn<T> mergeCol = Columns.GetMergeColumn();
      SqlMergeIdColumn<T> idCol = Columns.GetIdColumn();
      // get client surrogate ids
      SortedList<int, T> idLookup = new SortedList<int, T>(count);
      foreach (var item in _data)
         if (mergeCol.GetMergeAction(item) == ObjectState.Added)
            idLookup.Add(idCol.GetIdData(item), item);

      // map client ids to server ids
      IList<int> clientIds = new List<int>(idLookup.Keys);
      foreach (var clientId in clientIds)
         idCol.SetIdData(idLookup[clientId], lastId - --count);
   }
}
The merge command class orchestrates all other classes and represents the consumer interface for this simple Bulk Data Persistence framework.

The constructor gets the name of the database table, the name of the table-valued type and the data to be persisted.

The properties Connection and Transaction are used to configure the destination database.

The Columns:SqlMergeColumnCollection<T> property enables the consumer to configure the mapping between a domain object and a database table.

The Execute() method performs the database operation. It creates an ADO.NET SqlCommand, specifies the dynamic T-SQL by calling the GetCommandText():string method, executes the command and utilizes the SyncSurrogateIds(SqlDataReader) method to synchronize the ids of added rows.

As I wrote in my previous post, probably the SQL Server 2008 MERGE statement performs better than classical separate INSERT/UPDATE/DELETE operations. I have to investigate this and will share my results in a separate post.

How to use the Framework

Okay. Now we've got a shiny new framework. How to use?

I recycle the test environment we've already seen in the previous post to persist TestMany objects.

Below a T-SQL DDL which specifies the destination table and the table-valued type DTO.
-- the database table
CREATE TABLE TestMany (
   Id int NOT NULL IDENTITY(1,1)
      PRIMARY KEY CLUSTERED
   ,SomeInt int NULL
   ,SomeDate datetime NULL
   ,SomeText varchar(100) NULL
);

-- the table-valued type DTO
CREATE TYPE TestManyMergeType AS TABLE(
   Id int NOT NULL
   ,Action char(1) NOT NULL
      CHECK (Action IN ('D', 'U', 'I'))
   ,SomeInt int NULL
   ,SomeDate datetime NULL
   ,SomeText varchar(100) NULL
   
   ,PRIMARY KEY CLUSTERED 
      (Id, Action)
);
And our domain object which maps to the database table.
public class TestMany
{
   public TestMany(int id, int someInt, DateTime someDate,
                   string someText, ObjectState changeState) {
      Id = id;
      SomeInt = someInt;
      SomeDate = someDate;
      SomeText = someText;
      ChangeState = changeState;
   }

   public int Id { get; set; }
   public int SomeInt { get; set; }
   public DateTime SomeDate { get; set; }
   public string SomeText { get; set; }
   public ObjectState ChangeState { get; set; }
}
Anyway, what do we need to consume our framework?

We need one class which inherits our SqlMergeCommand<T> class to specify the column mapping.
class TestManyMergeCommand : Sql2k8MergeCommand<TestMany>
{
   public TestManyMergeCommand(IEnumerable<TestMany> data)
      : base("TestMany", "TestManyMergeType", 
             data.Where(t => t.ChangeState != ObjectState.Unchanged)) {
      // map the identity column
      Columns.SetIdColumn("Id", (t) => t.Id, (t, id) => t.Id = id);
      // map the merge action column
      Columns.SetMergeColumn("Action", (t) => t.ChangeState);
      // map the data columns
      Columns.AddDataColumn("SomeInt", SqlDbType.Int, 
                            (t) => t.SomeInt);
      Columns.AddDataColumn("SomeDate", SqlDbType.DateTime, 
                            (t) => t.SomeDate);
      Columns.AddDataColumn("SomeText", SqlDbType.VarChar, 100, 
                            (t) => t.SomeText);
   }
}
The class calls the base constructor to specify the destination table and the name of the table-valued type. The body of the constructor does the mapping. Done.

(At this point I have to say "Thanks god for Lambda Expressions" to provide the callback methods. :-P )

Last but not least a sample how to use this class to persist data.
public void GenericMergeTest() {
   using (SqlConnection cn = GetConnection(true))
   using (SqlTransaction tran = cn.BeginTransaction()) {
      // get test data
      IList<TestMany> data = GetTestData(0);

      TestManyMergeCommand mergeCommand = new TestManyMergeCommand(data);
      mergeCommand.Connection = cn;
      mergeCommand.Transaction = tran;
      mergeCommand.Execute();

      tran.Commit();
   }
}
As you see, we just have to initialize our custom command with data to be persisted, specify the database information and execute the command.

Conclusion

As you see, the complete framework takes about 350 lines of code what sounds quiet fair to me.

The merge command class can be plugged in to any existing O/R-Mapper framework which provides a "Unit of Work" functionality and a method/event to catch when data become persisted. If you like to use Bulk Data Persistence you don't have to rewrite your existing persistence layer.

Note, I did not add any error handling to this sample. If you like to use this in your production environment you have to add this. (E.g. to avoid duplicate column names for the mapping.)

If you already use an O/R-Mapper, you should change the way how to map your properties to columns. In this case you don't need a custom command for each of your domain objects, but you need a way to map the meta data of your O/R-Mapper to a merge command.

No comments:

Post a Comment

Note: Only a member of this blog may post a comment.