Transferring large volumes of data over the wire has always been a challenge. There are several approaches exist that try solving this problem from different angles. Each of those solutions suggests different usage scenarios and has pros and cons. Here are 3 major options:
- FTP. Although FTP is fast and efficient, it is not easy to use. It does not have well designed API or object model around it, it has issues with firewall, and requires long sessions to be opened during data transfer, which are not supposed to be interrupted. In addition, FTP infrastructure requires configuration and maintenance efforts.
- Streaming. Streaming is efficient solution for transferring large amounts of data over HTTP or TCP. However, it does not support reliable messaging and not very well scalable for high traffic scenarios.
- Chunking. This solution, among all others, has some overhead associated with chunking data and reconstituting it back into single stream. However, most parts of the overhead may be eliminated by having efficient architectural design. Chunking solution is ideal for large data transfer in high traffic scenarios, can support reliable messaging, security on both message and transport levels, offers efficient memory utilization, and suggests a straightforward way of implementing partial uploads in case of connection failures.
In this article, we will walk through one of the possible implementations of Chunking solution.
Current solution consists of Client (WPF UI application) and Server (WCF Console application). On the server side, we have WCF Service exposing several methods to manipulate data files. It is using Entity Framework Code First approach to talk to SQL Server to persist data.
On the database side, we got two tables - BlobFiles
and BlobFileChunks
. Every record in BlobFiles
table corresponds to the description of uploaded file and includes its Id (Guid), Name, Description, Size, User Id created, and creation timestamp. BlobFileChunks
, on the other hand, is a detail table referencing BlobFiles
by foreign key. Every record in this table represents a chunk of data in binary format as well as its sequential number (position) in the original file stored in the ChunkId
column.
On the client side, we have a grid displaying Data Files pulled from the database, and few buttons allowing to refresh data, upload a new file, remove existing file, as well as calculate file hash code and save file to disk. The latter two actions are purely demonstrational and serve the purpose of emulating actual consumption of file content on the server side.
Before using the code, make sure that connection settings configured correctly in server's App.config
file. By default, it is using local instance of SQL Server with Integrated Security:
<entityFramework> <defaultConnectionFactory type="System.Data.Entity.Infrastructure.SqlConnectionFactory, EntityFramework"> <parameters> <parameter value="Data Source=.;Integrated Security=True;MultipleActiveResultSets=True;" /> </parameters> </defaultConnectionFactory> <providers> <provider invariantName="System.Data.SqlClient" type="System.Data.Entity.SqlServer.SqlProviderServices, EntityFramework.SqlServer" /> </providers> </entityFramework>
Since project contains EF configured with automatic DB Migrations, the database is going to be created automatically, on the first run.
Without getting too deep into details, let's focus on major points of the solution - the mechanism of uploading chunks and later combining them together.
On the client side, the following code used to upload chunks to the server:
using (var service = new FileUploadServiceClient()) { var fileId = await service.CreateBlobFileAsync(Path.GetFileName(fileName), fileName, new FileInfo(fileName).Length, Environment.UserName); using (var stream = File.OpenRead(fileName)) { var edb = new ExecutionDataflowBlockOptions {BoundedCapacity = 5, MaxDegreeOfParallelism = 5}; var ab = new ActionBlock<Tuple<byte[], int>>(x => service.AddBlobFileChunkAsync(fileId, x.Item2, x.Item1), edb); foreach (var item in stream.GetByteChunks(chunkSize).Select((x, i) => Tuple.Create(x, i))) await ab.SendAsync(item); ab.Complete(); await ab.Completion; } }
As you can see, ActionBlock
class (part of great library TPL Dataflow) has been used in the code, which abstracts away asynchronous parallel processing, allowing to send maximum 5 chunks at a time (MaxDegreeOfParallelism
) and maintaining a buffer of 5 chunks in the memory (BoundedCapacity
).
By calling CreateBlobFileAsync
, we create master record with file details and obtain BlobFileId
. We then open a File Stream, call an extension method GetByteChunks
that returns an IEnumerable<byte[]>
. This enumerable is evaluated on the fly and every element contains a byte array of specified size (obviously, the last element may contain less data). Chunk size by default is set to 2MB and can be adjusted according to the usage scenario. Those chunks together with their sequential IDs are sent to the server in parallel via AddBlobFileChunkAsync
method. Note that the chunks do not necessarily have to be sent one after another as long as their sequential IDs represent their actual positions in the file.
Implementation of GetByteChunks
extension method is presented below:
public static IEnumerable<byte[]> GetByteChunks(this Stream stream, int length) { if (stream == null) throw new ArgumentNullException("stream"); var buffer = new byte[length]; int count; while ((count = stream.Read(buffer, 0, buffer.Length)) != 0) { var result = new byte[count]; Array.Copy(buffer, 0, result, 0, count); yield return result; } }
Let's explore ProcessFileAsync
method on the server side:
public async Task<string> ProcessFileAsync(Guid blobFileId) { List<int> chunks; using (var context = new FileUploadDemoContext()) { chunks = await context.Set<BlobFileChunks>() .Where(x => x.BlobFileId == blobFileId) .OrderBy(x => x.ChunkId) .Select(x => x.ChunkId) .ToListAsync(); } var result = 0; using (var stream = new MultiStream(GetBlobStreams(blobFileId, chunks))) { foreach (var chunk in stream.GetByteChunks(1024)) result = (result*31) ^ ComputeHash(chunk); } return string.Format("File Hash Code is: {0}", result); }
This method reads entire file content in a lazy manner and calculates its hash code. First, we retrieve all chunks associated with given BlobFileId
sorted by ChunkId
from database. Those chunk IDs then passed to GetByteChunks
helper method, which retrieves actual binary content of those chunks from database on demand, one after another.
The key role in combining chunks together plays MultiStream
class described below.
MultiStream
is inhereted from standard Stream
class and represents a read-only stream with ability to seek forward. The Seek
method ensures that position never gets backwards:
public override long Seek(long offset, SeekOrigin origin) { switch (origin) { case SeekOrigin.Begin: m_position = offset; break; case SeekOrigin.Current: m_position += offset; break; case SeekOrigin.End: m_position = m_length - offset; break; } if (m_position > m_length) m_position = m_length; if (m_position < m_minPosition) { m_position = m_minPosition; throw new NotSupportedException("Cannot seek backwards"); } return m_position; }
In the Read
method, we pull chunks on "as needed" basis until we reach the last one. This way only one chunk at a time is held in memory, which very efficient and scalable:
public override int Read(byte[] buffer, int offset, int count) { var result = 0; while (true) { if (m_stream == null) { if (!m_streamEnum.MoveNext()) { m_length = m_position; break; } m_stream = m_streamEnum.Current; } if (m_position >= m_minPosition + m_stream.Length) { m_minPosition += m_stream.Length; m_stream.Dispose(); m_stream = null; } else { m_stream.Position = m_position - m_minPosition; var bytesRead = m_stream.Read(buffer, offset, count); result += bytesRead; offset += bytesRead; m_position += bytesRead; if (bytesRead < count) { count -= bytesRead; m_minPosition += m_stream.Length; m_stream.Dispose(); m_stream = null; } else break; } } return result; }
- Current approach requires both
Client
andServer
to be aware of protocol - Solution is well suited for intranet and internet scenarios
- DB persistence layer can be easily replaced by File System if needed
- Reliable messaging can be easily implemented on top of current architecture
- There is a lot of room for improvement, so please feel free to post your suggestions or concerns.