Friday, May 9, 2014

Azure Table Storage: Writing data in a batch

Azure table support writing items in a batch, as long as 1) all the items are in the same partition and 2) the batch is not bigger than 100 items (if it is, you get ‘unexpected response code for operation : 0’). 

If you have a big list of items of different partitions you will have to write the logic to split the items by partition and send 100 items at a time. This post provides a suggested implementation for the same

The code+tests can be downloaded from here.

Here’s the main workflow, it does the following.

  1. We create a new Azure Table (in the Table Emulator)
  2. Create 2 groups of 150 items, each group in a different partition. 
  3. Use the BatchCalculator class to split the items into batches.
  4. Write each batch to the table
        static void Main(string[] args)
        {
            // Make sure that Azure Storage Emulator is running
            
            Console.WriteLine("Creating new table");
            CloudTable table = CreateTable();
            try
            {
                const int numberOfItemsInPartition = 150;
                IEnumerable<ITableEntity> items1 = GetNewPartitionData(numberOfItemsInPartition);
                IEnumerable<ITableEntity> items2 = GetNewPartitionData(numberOfItemsInPartition);

                var all = new List<ITableEntity>(items1);
                all.AddRange(items2);

                Console.WriteLine("Writing {0} items", all.Count);
                BatchDiagnostics batchDiagnostics = WriteBatch(table, all).Result;

                Console.WriteLine("Diagnostic:" + batchDiagnostics);
            }
            catch(Exception exception)
            {
                Console.WriteLine(exception);
            }
            finally
            {
                Console.ReadLine();
                Console.WriteLine("Deleting table");
                table.Delete();
            }
        }

        private static CloudTable CreateTable()
        {
            CloudStorageAccount storageAccount = CloudStorageAccount.DevelopmentStorageAccount;

            // Create the table client.
            CloudTableClient tableClient = storageAccount.CreateCloudTableClient();

            // Create the table if it doesn't exist.
            var table = tableClient.GetTableReference("batchtesttable");
            table.CreateIfNotExists();

            return table;
        }

        private static IEnumerable<ITableEntity> GetNewPartitionData(int numberOfItems)
        {
            var list = new List<ITableEntity>();

            Guid partition = Guid.NewGuid();
            for (int i = 0; i < numberOfItems; i++)
            {
                list.Add(new MyEntity()
                {
                    SomeData = "Data",
                    RowKey = i.ToString(CultureInfo.InvariantCulture),
                    PartitionKey = partition.ToString()
                });
            }
            return list;
        }

        private static async Task<BatchDiagnostics> WriteBatch<T>(CloudTable table, IEnumerable<T> entities) where T : ITableEntity
        {
            BatchDiagnostics diagnostics;
            IEnumerable<TableBatchOperation> batches = BatchCalculator.GetBatches(entities, out diagnostics);

            var tasks = batches.Select(table.ExecuteBatchAsync);
            await Task.WhenAll(tasks);

            return diagnostics;
        }
    }

    public class MyEntity : TableEntity
    {
        public string SomeData;
    }

Here’s the implementation of BatchCalculator. This class is responsible to split the items to groups, where each group is not bigger than 100 and all items belong to the same partition.

    public class BatchCalculator
    {
        const int batchMaxSize = 100;

        public static IEnumerable<TableBatchOperation> GetBatches<T>(IEnumerable<T> entities, out BatchDiagnostics diagnostics) where T : ITableEntity
        {
            var list = new List<TableBatchOperation>();

            IGrouping<string, T>[] partitionGroups = entities.GroupBy(arg => arg.PartitionKey).ToArray();
            foreach (IGrouping<string, T> entitiesGroupedByPartition in partitionGroups)
            {
                T[] groupList = entitiesGroupedByPartition.ToArray();
                int pointer = batchMaxSize;
                T[] items = groupList.Take(pointer).ToArray();
                while (items.Any())
                {
                    var tableBatchOperation = new TableBatchOperation();
                    foreach (var item in items)
                    {
                        tableBatchOperation.Add(TableOperation.InsertOrReplace(item));
                    }
                    list.Add(tableBatchOperation);
                    items = groupList.Skip(pointer).Take(batchMaxSize).ToArray();
                    pointer += batchMaxSize;
                }
            }

            diagnostics = new BatchDiagnostics(partitionGroups.Length, list.Count);
            return list;
        }
    }

    public class BatchDiagnostics
    {
        private readonly int partitions;
        private readonly int batches;

        public BatchDiagnostics(int partitions, int batches)
        {
            this.partitions = partitions;
            this.batches = batches;
        }

        public int Partitions
        {
            get { return partitions; }
        }

        public int Batches
        {
            get { return batches; }
        }

        public override string ToString()
        {
            return string.Format("Partitions: {0}, Batches: {1}", partitions, batches);
        }
    }

HTA! Aviad

No comments:

Post a Comment