// Copyright Epic Games, Inc. All Rights Reserved.
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using EpicGames.Core;
using EpicGames.Horde.Storage;
using Microsoft.Extensions.Logging;
namespace EpicGames.Horde.Logs
{
///
/// Format for the log file
///
public enum LogFormat
{
///
/// Text data
///
Text = 0,
///
/// Json data
///
Json = 1,
}
///
/// Represents an entire log
///
[BlobConverter(typeof(LogNodeConverter))]
public class LogNode
{
///
/// Format for this log file
///
public LogFormat Format { get; }
///
/// Total number of lines
///
public int LineCount { get; }
///
/// Length of the log file
///
public long Length { get; }
///
/// Text blocks for this chunk
///
public IReadOnlyList TextChunkRefs { get; }
///
/// Index for this log
///
public IHashedBlobRef IndexRef { get; }
///
/// Whether this log is complete
///
public bool Complete { get; }
///
/// Deserializing constructor
///
public LogNode(LogFormat format, int lineCount, long length, IReadOnlyList textChunkRefs, IHashedBlobRef indexRef, bool complete)
{
Format = format;
LineCount = lineCount;
Length = length;
TextChunkRefs = textChunkRefs.ToArray();
IndexRef = indexRef;
Complete = complete;
}
}
///
/// Serializer for types
///
class LogNodeConverter : BlobConverter
{
///
/// Type of blob when serialized to storage
///
public static BlobType BlobType { get; } = new BlobType("{274DF8F7-4B4F-9E87-8C31-D58A33AD25DB}", 1);
///
public override LogNode Read(IBlobReader reader, BlobSerializerOptions options)
{
LogFormat format = (LogFormat)reader.ReadUInt8();
int lineCount = (int)reader.ReadUnsignedVarInt();
long length = (long)reader.ReadUnsignedVarInt();
IHashedBlobRef indexRef = reader.ReadBlobRef();
List textChunkRefs = reader.ReadList(() => new LogChunkRef(reader));
bool complete = reader.ReadBoolean();
return new LogNode(format, lineCount, length, textChunkRefs, indexRef, complete);
}
///
public override BlobType Write(IBlobWriter writer, LogNode value, BlobSerializerOptions options)
{
writer.WriteUInt8((byte)value.Format);
writer.WriteUnsignedVarInt(value.LineCount);
writer.WriteUnsignedVarInt((ulong)value.Length);
writer.WriteBlobRef(value.IndexRef);
writer.WriteList(value.TextChunkRefs, x => x.Serialize(writer));
writer.WriteBoolean(value.Complete);
return BlobType;
}
}
///
/// Assists building log files through trees of , and nodes. This
/// class is designed to be thread safe, and presents a consistent view to readers and writers.
///
public class LogBuilder
{
///
/// Default maximum size for a log text block
///
public const int DefaultTextBlockLength = 256 * 1024;
///
/// Default maximum size for an index text block
///
public const int DefaultIndexBlockLength = 64 * 1024;
///
/// Number of lines written to the log
///
public int LineCount => _lineCount;
int _lineCount;
readonly LogFormat _format;
// Data for the log file which has been flushed to disk so far
LogNode? _root;
LogIndexNode _index = LogIndexNode.Empty;
// Json data read but not flushed
readonly LogChunkSequenceBuilder _textBuilder;
readonly LogChunkSequenceBuilder _indexTextBuilder;
// Lock object for access to the fields above
readonly object _lockObject = new object();
// Inner log device; used for messaging encoding errors.
readonly ILogger _logger;
///
/// Constructor
///
/// Format for data in the log file
///
public LogBuilder(LogFormat format, ILogger logger)
: this(format, DefaultTextBlockLength, DefaultIndexBlockLength, logger)
{
}
///
/// Constructor
///
/// Format of data in the log file
/// maximum size for a regular text block
/// Maximum size for an index text block
/// Logger for conversion errors
public LogBuilder(LogFormat format, int maxTextBlockLength, int maxIndexBlockLength, ILogger logger)
{
_format = format;
_textBuilder = new LogChunkSequenceBuilder(maxTextBlockLength);
_indexTextBuilder = new LogChunkSequenceBuilder(maxIndexBlockLength);
_logger = logger;
}
///
/// Read data from the unflushed log tail
///
/// The first line to read, from the end of the flushed data
///
///
public (int LineIdx, ReadOnlyMemory Data) ReadTailData(int firstLineIdx, int maxLength)
{
lock (_lockObject)
{
// Clamp the first line index to the first available
int flushedLineCount = _root?.LineCount ?? 0;
firstLineIdx = Math.Max(firstLineIdx, flushedLineCount);
// Measure the size of buffer required for the tail data
int length = 0;
int lineCount = 0;
foreach (Utf8String line in _textBuilder.EnumerateLines(firstLineIdx))
{
int nextLength = length + line.Length;
if (length > 0 && nextLength > maxLength)
{
break;
}
length = nextLength;
lineCount++;
}
// Allocate the buffer
byte[] buffer = new byte[length];
// Copy lines into the buffer
Span output = buffer.AsSpan();
foreach (Utf8String line in _textBuilder.EnumerateLines(firstLineIdx).Take(lineCount))
{
line.Span.CopyTo(output);
output = output.Slice(line.Length);
}
Debug.Assert(output.Length == 0);
return (firstLineIdx, buffer);
}
}
///
/// Append JSON data to the end of the log
///
/// Log data to append
public void WriteData(ReadOnlyMemory data)
{
lock (_lockObject)
{
ReadOnlyMemory remaining = data;
for (; ; )
{
int newlineIdx = remaining.Span.IndexOf((byte)'\n');
if (newlineIdx == -1)
{
break;
}
ReadOnlyMemory line = remaining.Slice(0, newlineIdx + 1);
_textBuilder.Append(line.Span);
_lineCount++;
if (_format == LogFormat.Json)
{
_indexTextBuilder.AppendJsonAsPlainText(line.Span, _logger);
}
else
{
_indexTextBuilder.Append(line.Span);
}
remaining = remaining.Slice(newlineIdx + 1);
}
}
}
///
/// Flushes the written data to the log
///
/// Writer for the output nodes
/// Whether the log is complete
/// Cancellation token for the operation
public async Task> FlushAsync(IBlobWriter writer, bool complete, CancellationToken cancellationToken)
{
// Capture the new data that needs to be written
IReadOnlyList writeTextChunks;
IReadOnlyList writeIndexTextChunks;
lock (_lockObject)
{
_textBuilder.Flush();
writeTextChunks = _textBuilder.Chunks.ToArray();
_indexTextBuilder.Flush();
writeIndexTextChunks = _indexTextBuilder.Chunks.ToArray();
}
// Flush any complete chunks to storage
LogIndexNode newIndex = await _index.AppendAsync(writer, writeIndexTextChunks, cancellationToken);
IHashedBlobRef newIndexRef = await writer.WriteBlobAsync(newIndex, cancellationToken);
List newJsonChunkRefs = new List(_root?.TextChunkRefs ?? Array.Empty());
int lineCount = _root?.LineCount ?? 0;
long length = _root?.Length ?? 0;
foreach (LogChunkNode writeTextChunk in writeTextChunks)
{
IHashedBlobRef writeTextChunkRef = await writer.WriteBlobAsync(writeTextChunk, cancellationToken);
newJsonChunkRefs.Add(new LogChunkRef(lineCount, writeTextChunk.LineCount, length, writeTextChunk.Length, writeTextChunkRef));
lineCount += writeTextChunk.LineCount;
length += writeTextChunk.Length;
}
LogNode newRoot = new LogNode(_format, lineCount, length, newJsonChunkRefs, newIndexRef, complete);
IHashedBlobRef newRootRef = await writer.WriteBlobAsync(newRoot, cancellationToken);
await writer.FlushAsync(cancellationToken);
// Update the new state
lock (_lockObject)
{
_root = newRoot;
_index = newIndex;
_textBuilder.Remove(writeTextChunks.Count);
_indexTextBuilder.Remove(writeIndexTextChunks.Count);
}
return newRootRef;
}
}
///
/// Extension methods
///
public static class LogNodeExtensions
{
///
/// Reads lines from a line
///
/// Log to read from
/// Cancellation token
/// Sequence of line buffers
public static async IAsyncEnumerable> ReadLogAsync(this LogNode logNode, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
foreach (LogChunkRef textChunkRef in logNode.TextChunkRefs)
{
LogChunkNode textChunk = await textChunkRef.Target.ReadBlobAsync(cancellationToken);
yield return textChunk.Data;
}
}
///
/// Reads lines from a line
///
/// Log to read from
/// Zero-based index of the first line to read from
/// Cancellation token
/// Sequence of line buffers
public static async IAsyncEnumerable> ReadLogLinesAsync(this LogNode logNode, int index, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
foreach (LogChunkRef textChunkRef in logNode.TextChunkRefs)
{
int lineIdx = Math.Max(index - textChunkRef.LineIndex, 0);
if (lineIdx < textChunkRef.LineCount)
{
LogChunkNode textChunk = await textChunkRef.Target.ReadBlobAsync(cancellationToken);
int offset = textChunk.LineOffsets[lineIdx];
for (; lineIdx < textChunk.LineCount; lineIdx++)
{
int nextOffset = textChunk.LineOffsets[lineIdx + 1];
ReadOnlyMemory line = textChunk.Data.Slice(offset, nextOffset - offset);
yield return line;
offset = nextOffset;
}
}
}
}
}
}