// Copyright Epic Games, Inc. All Rights Reserved. using System; using System.Collections.Generic; using System.IO; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; using EpicGames.Core; using EpicGames.Horde.Storage.Nodes; using EpicGames.Horde.Utilities; namespace EpicGames.Horde.Storage { /// /// Base class for implementations of a content defined chunker /// public abstract class ContentChunker { /// /// Source channel. /// public abstract ChannelWriter SourceWriter { get; } /// /// Output channel. The order of items written to the source writer will be preserved in the output reader. Each /// output item should be read completely (through calls to ) /// before reading the next. /// public abstract ChannelReader OutputReader { get; } } /// /// Base class for input to a . Allows the chunker to read data into a buffer as required. /// public abstract class ChunkerSource { /// /// Length of the input data /// public abstract long Length { get; } /// /// Optional user specified data to be propagated to the output /// public virtual object? UserData { get; } = null; /// /// Starts a task to read the next chunk of data. Note that this task may be called again before the task completes. /// /// Buffer to store the data /// Cancellation token for the operation public abstract Task StartReadAsync(Memory memory, CancellationToken cancellationToken = default); } /// /// Implementation of for files on disk /// public class FileChunkerSource : ChunkerSource { readonly FileInfo _fileInfo; readonly object? _userData; long _offset; /// public override object? UserData => _userData; /// /// Constructor /// public FileChunkerSource(FileInfo fileInfo, object? userData) { _fileInfo = fileInfo; _userData = userData; } /// public override long Length => _fileInfo.Length; /// public override Task StartReadAsync(Memory memory, CancellationToken cancellationToken = default) { long offset = _offset; _offset += memory.Length; return Task.FromResult(HandleReadAsync(offset, memory, cancellationToken)); } /// async Task HandleReadAsync(long offset, Memory memory, CancellationToken cancellationToken) { using PlatformFileLock? platformFileLock = await PlatformFileLock.CreateAsync(cancellationToken); FileStreamOptions options = new FileStreamOptions { Mode = FileMode.Open, Access = FileAccess.Read, Options = FileOptions.Asynchronous }; using FileStream stream = _fileInfo.Open(options); stream.Seek(offset, SeekOrigin.Begin); await stream.ReadExactlyAsync(memory, cancellationToken); } } /// /// Implementation of for data in memory /// class MemoryChunkerSource : ChunkerSource { readonly ReadOnlyMemory _data; int _offset; /// public override long Length => _data.Length; /// /// Constructor /// public MemoryChunkerSource(Memory data) => _data = data; /// public override Task StartReadAsync(Memory memory, CancellationToken cancellationToken = default) { _data.Slice(_offset, memory.Length).CopyTo(memory); _offset += memory.Length; return Task.FromResult(Task.CompletedTask); } } /// /// Enumerates chunks from an input file /// public abstract class ChunkerOutput { /// /// Rolling hash for this chunk /// public abstract uint RollingHash { get; } /// /// Accessor for the chunk's data /// public abstract ReadOnlyMemory Data { get; } /// /// User specified data from the /// public abstract object? UserData { get; } /// /// Moves to the next output chunk /// /// Cancellation token for the operation /// True if there was another output chunk public abstract ValueTask MoveNextAsync(CancellationToken cancellationToken = default); } /// /// Simple serial implementation of content chunking /// public class SerialBuzHashChunker : ContentChunker { class Output : ChunkerOutput { readonly SerialBuzHashChunker _pipeline; public readonly ChunkerSource Input; public bool _firstOutput = true; public uint _rollingHash; public long _inputOffset; public long _outputOffset; public ReadOnlyMemory _data; public override uint RollingHash => _rollingHash; public override ReadOnlyMemory Data => _data; public override object? UserData => Input.UserData; public Output(SerialBuzHashChunker pipeline, ChunkerSource input) { _pipeline = pipeline; Input = input; } /// public override ValueTask MoveNextAsync(CancellationToken cancellationToken = default) => _pipeline.MoveNextAsync(Input, this, cancellationToken); } readonly Memory _buffer; readonly Channel _sourceChannel; readonly Channel _outputChannel; readonly LeafChunkedDataNodeOptions _options; readonly Queue _order = new Queue(); int _bufferOffset; int _bufferLength; int _chunkLength; /// public override ChannelWriter SourceWriter => _sourceChannel.Writer; /// public override ChannelReader OutputReader => _outputChannel.Reader; /// /// Constructor /// public SerialBuzHashChunker(AsyncPipeline pipeline, Memory buffer, LeafChunkedDataNodeOptions options) { _buffer = buffer; _sourceChannel = Channel.CreateUnbounded(); _outputChannel = Channel.CreateUnbounded(); _options = options; _ = pipeline.AddTask(ProcessInputAsync); } async Task ProcessInputAsync(CancellationToken cancellationToken) { ChannelReader inputReader = _sourceChannel.Reader; ChannelWriter outputWriter = _outputChannel.Writer; while (await inputReader.WaitToReadAsync(cancellationToken)) { while (inputReader.TryRead(out ChunkerSource? input)) { Output output = new Output(this, input); if (input.Length > 0) { lock (_order) { _order.Enqueue(output); } } await outputWriter.WriteAsync(output, cancellationToken); } } outputWriter.Complete(); } async ValueTask MoveNextAsync(ChunkerSource input, Output output, CancellationToken cancellationToken) { bool firstOutput = output._firstOutput; output._firstOutput = false; if (output._outputOffset == output.Input.Length) { output._data = ReadOnlyMemory.Empty; return firstOutput; } else { lock (_order) { while (_order.TryPeek(out Output? current) && current._outputOffset == current.Input.Length) { _order.Dequeue(); } if (output != _order.Peek()) { throw new InvalidOperationException(); } } // Discard the previous chunk _bufferOffset += _chunkLength; _bufferLength -= _chunkLength; // Shrink the buffer once we don't have enough space for a full chunk if (_bufferLength < _options.MaxSize) { // Move the remaining buffer to the start if (_bufferOffset > 0) { Memory source = _buffer.Slice(_bufferOffset, _bufferLength); source.CopyTo(_buffer); _bufferOffset = 0; } // Top up the input buffer int readLength = (int)Math.Min(input.Length - output._inputOffset, _buffer.Length - _bufferLength); if (readLength > 0) { Task readTask = await input.StartReadAsync(_buffer.Slice(_bufferLength, readLength), cancellationToken); await readTask; output._inputOffset += readLength; _bufferLength += readLength; } } // Find the next chunk _chunkLength = BuzHash.FindChunkLength(_buffer.Span.Slice(_bufferOffset, _bufferLength), _options.MinSize, _options.MaxSize, _options.TargetSize, out output._rollingHash); output._data = _buffer.Slice(_bufferOffset, _chunkLength); output._outputOffset += _chunkLength; return true; } } } /// /// Parallel implementation of /// public class ParallelBuzHashChunker : ContentChunker { const int ReadBlockSize = 256 * 1024; record class InputRequest(ChunkerSource Input, TaskCompletionSource FirstBlockTcs); record class BlockRequest(int BufferOffset, int Length, Task ReadTask, TaskCompletionSource BlockTcs, Task? NextBlock); record class Block(int BufferOffset, int Length, List<(int, uint)> Boundaries, Task? Next); class RingBuffer { readonly object _lockObject = new object(); int _head; int _size; readonly Memory _data; readonly int _wrapSize; readonly AsyncEvent _freeEvent = new AsyncEvent(); public RingBuffer(Memory data, int wrapSize) { _data = data; _wrapSize = wrapSize; } public Memory GetData(int offset, int length) => _data.Slice(offset, length); public async Task AllocAsync(int blockSize, CancellationToken cancellationToken) { if (blockSize > _wrapSize) { throw new ArgumentException($"Requested block size is too large ({blockSize} > {_wrapSize})", nameof(blockSize)); } for (; ; ) { Task freeTask = _freeEvent.Task; lock (_lockObject) { // Figure out how much of the buffer we need to allocate, wrapping round to the start if necessary int allocResult = _head; int allocLength = blockSize; if (_head + blockSize > _wrapSize) { allocResult = 0; allocLength += (_wrapSize - _head); } // Check if we have the space available to allocate if (_size + allocLength <= _wrapSize) { _head = allocResult + blockSize; _size += allocLength; return allocResult; } } // Wait for more space to become available await freeTask.WaitAsync(cancellationToken); } } public void Free(int offset, int length) { if (offset < 0) { throw new ArgumentException("Offset is out of range", nameof(offset)); } if (length < 0 || offset + length > _wrapSize) { throw new ArgumentException("Length is out of range", nameof(length)); } if (length > 0) { lock (_lockObject) { int end = offset + length; if (offset < _head) { _size = _head - end; } else { _size = (_wrapSize - end) + _head; } } _freeEvent.Set(); } } } class LatestCdcOutput : ChunkerOutput { readonly ParallelBuzHashChunker _pipeline; readonly object? _userData; Task _blockTask; int _blockOffset; int _boundaryIdx; uint _rollingHash; ReadOnlyMemory _data; bool _firstChunk = true; public LatestCdcOutput(ParallelBuzHashChunker pipeline, object? userData, Task blockTask) { _pipeline = pipeline; _userData = userData; _blockTask = blockTask; } public override uint RollingHash => _rollingHash; public override ReadOnlyMemory Data => _data; public override object? UserData => _userData; public override async ValueTask MoveNextAsync(CancellationToken cancellationToken = default) { RingBuffer ringBuffer = _pipeline._ringBuffer; LeafChunkedDataNodeOptions options = _pipeline._options; // Always need to return at least one chunk, even if the source data is empty. Use this flag to track whether // we've returned something. bool firstChunk = _firstChunk; _firstChunk = false; for (; ; ) { Block block = await _blockTask.WaitAsync(cancellationToken); ReadOnlyMemory blockData = ringBuffer.GetData(block.BufferOffset, block.Length); // Release data from the current chunk _blockOffset += _data.Length; _data = ReadOnlyMemory.Empty; // Enumerate all the chunks from the cached boundaries for (; _boundaryIdx < block.Boundaries.Count; _boundaryIdx++) { (int boundaryOffset, _rollingHash) = block.Boundaries[_boundaryIdx]; if (boundaryOffset >= _blockOffset + options.MaxSize) { _data = blockData.Slice(_blockOffset, options.MaxSize); return true; } if (boundaryOffset >= _blockOffset + options.MinSize) { _data = blockData.Slice(_blockOffset, boundaryOffset - _blockOffset); return true; } } // If this is the last block, return all the remaining data if (block.Next == null) { _data = blockData.Slice(_blockOffset); _ = BuzHash.FindChunkLength(_data.Span, options.MinSize, options.MaxSize, options.TargetSize, out _rollingHash); if (_data.Length == 0 && !firstChunk) { ringBuffer.Free(block.BufferOffset, block.Length); _blockTask = null!; return false; } return true; } // If we still have remaining data in the current block, find a chunk that spans this and the next one if (_blockOffset < block.Length) { Block nextBlock = await block.Next.WaitAsync(cancellationToken); // If the two blocks aren't contiguous, append data from the next block to make the max chunk size. // We allocate slack space in the buffer to ensure that we have enough space to support this. int appendLength = Math.Min((_blockOffset + options.MaxSize) - block.Length, nextBlock.Length); if (appendLength > 0) { int blockEnd = block.BufferOffset + block.Length; if (nextBlock.BufferOffset != blockEnd) { ReadOnlyMemory copyData = ringBuffer.GetData(nextBlock.BufferOffset, appendLength); copyData.CopyTo(ringBuffer.GetData(blockEnd, appendLength)); } blockData = ringBuffer.GetData(block.BufferOffset, block.Length + appendLength); } int chunkLength = BuzHash.FindChunkLength(blockData.Slice(_blockOffset).Span, options.MinSize, options.MaxSize, options.TargetSize, out _rollingHash); _data = blockData.Slice(_blockOffset, chunkLength); return true; } // Release data from the current block ringBuffer.Free(block.BufferOffset, block.Length); // Move to the next block _boundaryIdx = 0; _blockOffset -= block.Length; _blockTask = block.Next; } } } readonly RingBuffer _ringBuffer; readonly LeafChunkedDataNodeOptions _options; readonly Channel _inputChannel; readonly Channel _outputChannel; readonly Channel _inputRequestChannel; readonly Channel _blockRequestChannel; /// public override ChannelWriter SourceWriter => _inputChannel.Writer; /// public override ChannelReader OutputReader => _outputChannel.Reader; /// /// Constructor /// public ParallelBuzHashChunker(AsyncPipeline pipeline, LeafChunkedDataNodeOptions options) : this(pipeline, new byte[2 * 1024 * 1024], options) { } /// /// Constructor /// public ParallelBuzHashChunker(AsyncPipeline pipeline, Memory buffer, LeafChunkedDataNodeOptions options) { if (buffer.Length < options.MaxSize * 3) { throw new ArgumentException($"Buffer must be large enough to contain three blocks of maximum size", nameof(buffer)); } if (options.MaxSize > ReadBlockSize) { throw new ArgumentException($"Max chunk size must be less than {ReadBlockSize} bytes", nameof(options)); } _ringBuffer = new RingBuffer(buffer, buffer.Length - options.MaxSize); _options = options; _inputChannel = Channel.CreateUnbounded(); _outputChannel = Channel.CreateUnbounded(); _inputRequestChannel = Channel.CreateUnbounded();//.CreateBounded(new BoundedChannelOptions(20)); _blockRequestChannel = Channel.CreateUnbounded(); _ = pipeline.AddTask(ProcessInputAsync); _ = pipeline.AddTask(ProcessInputRequestsAsync); _ = pipeline.AddTasks(32, ProcessBlockRequestsAsync); } async Task ProcessInputAsync(CancellationToken cancellationToken) { ChannelReader inputReader = _inputChannel.Reader; ChannelWriter outputWriter = _outputChannel.Writer; while (await inputReader.WaitToReadAsync(cancellationToken)) { ChunkerSource? input; while (inputReader.TryRead(out input)) { ChunkerOutput output = await AddAsync(input, cancellationToken); await outputWriter.WriteAsync(output, cancellationToken); } } _inputRequestChannel.Writer.Complete(); outputWriter.Complete(); } async Task AddAsync(ChunkerSource input, CancellationToken cancellationToken = default) { InputRequest inputRequest = new InputRequest(input, new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously)); await _inputRequestChannel.Writer.WriteAsync(inputRequest, cancellationToken); return new LatestCdcOutput(this, input.UserData, inputRequest.FirstBlockTcs.Task); } async Task ProcessInputRequestsAsync(CancellationToken cancellationToken) { ChannelReader reader = _inputRequestChannel.Reader; ChannelWriter writer = _blockRequestChannel.Writer; while (await reader.WaitToReadAsync(cancellationToken)) { while (reader.TryRead(out InputRequest? inputRequest)) { ChunkerSource input = inputRequest.Input; TaskCompletionSource? blockTcs = inputRequest.FirstBlockTcs; for (long sourceOffset = 0; blockTcs != null;) { // Read a block of data int readLength = (int)Math.Min(input.Length - sourceOffset, ReadBlockSize); int bufferOffset = await _ringBuffer.AllocAsync(readLength, cancellationToken); Memory data = _ringBuffer.GetData(bufferOffset, readLength); Task readTask = await input.StartReadAsync(data, cancellationToken); sourceOffset += readLength; // Post the request to handle the result TaskCompletionSource? nextBlockTcs; if (sourceOffset >= input.Length) { nextBlockTcs = null; } else { nextBlockTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); } BlockRequest blockRequest = new BlockRequest(bufferOffset, readLength, readTask, blockTcs, nextBlockTcs?.Task); await writer.WriteAsync(blockRequest, cancellationToken); blockTcs = nextBlockTcs; } } } writer.Complete(); } async Task ProcessBlockRequestsAsync(CancellationToken cancellationToken) { ChannelReader reader = _blockRequestChannel.Reader; while (await reader.WaitToReadAsync(cancellationToken)) { while (reader.TryRead(out BlockRequest? blockRequest)) { // Wait until the data has been read await blockRequest.ReadTask; // Find the split points Memory data = _ringBuffer.GetData(blockRequest.BufferOffset, blockRequest.Length); List<(int, uint)> boundaries = BuzHash.FindCandidateSplitPoints(data.Span, _options.WindowSize, _options.Threshold); // Create the block Block block = new Block(blockRequest.BufferOffset, blockRequest.Length, boundaries, blockRequest.NextBlock); blockRequest.BlockTcs.SetResult(block); } } } } }