// Copyright Epic Games, Inc. All Rights Reserved. using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; using EpicGames.Core; using EpicGames.Horde.Storage; using Microsoft.Extensions.Logging; namespace EpicGames.Horde.Logs { /// /// Format for the log file /// public enum LogFormat { /// /// Text data /// Text = 0, /// /// Json data /// Json = 1, } /// /// Represents an entire log /// [BlobConverter(typeof(LogNodeConverter))] public class LogNode { /// /// Format for this log file /// public LogFormat Format { get; } /// /// Total number of lines /// public int LineCount { get; } /// /// Length of the log file /// public long Length { get; } /// /// Text blocks for this chunk /// public IReadOnlyList TextChunkRefs { get; } /// /// Index for this log /// public IHashedBlobRef IndexRef { get; } /// /// Whether this log is complete /// public bool Complete { get; } /// /// Deserializing constructor /// public LogNode(LogFormat format, int lineCount, long length, IReadOnlyList textChunkRefs, IHashedBlobRef indexRef, bool complete) { Format = format; LineCount = lineCount; Length = length; TextChunkRefs = textChunkRefs.ToArray(); IndexRef = indexRef; Complete = complete; } } /// /// Serializer for types /// class LogNodeConverter : BlobConverter { /// /// Type of blob when serialized to storage /// public static BlobType BlobType { get; } = new BlobType("{274DF8F7-4B4F-9E87-8C31-D58A33AD25DB}", 1); /// public override LogNode Read(IBlobReader reader, BlobSerializerOptions options) { LogFormat format = (LogFormat)reader.ReadUInt8(); int lineCount = (int)reader.ReadUnsignedVarInt(); long length = (long)reader.ReadUnsignedVarInt(); IHashedBlobRef indexRef = reader.ReadBlobRef(); List textChunkRefs = reader.ReadList(() => new LogChunkRef(reader)); bool complete = reader.ReadBoolean(); return new LogNode(format, lineCount, length, textChunkRefs, indexRef, complete); } /// public override BlobType Write(IBlobWriter writer, LogNode value, BlobSerializerOptions options) { writer.WriteUInt8((byte)value.Format); writer.WriteUnsignedVarInt(value.LineCount); writer.WriteUnsignedVarInt((ulong)value.Length); writer.WriteBlobRef(value.IndexRef); writer.WriteList(value.TextChunkRefs, x => x.Serialize(writer)); writer.WriteBoolean(value.Complete); return BlobType; } } /// /// Assists building log files through trees of , and nodes. This /// class is designed to be thread safe, and presents a consistent view to readers and writers. /// public class LogBuilder { /// /// Default maximum size for a log text block /// public const int DefaultTextBlockLength = 256 * 1024; /// /// Default maximum size for an index text block /// public const int DefaultIndexBlockLength = 64 * 1024; /// /// Number of lines written to the log /// public int LineCount => _lineCount; int _lineCount; readonly LogFormat _format; // Data for the log file which has been flushed to disk so far LogNode? _root; LogIndexNode _index = LogIndexNode.Empty; // Json data read but not flushed readonly LogChunkSequenceBuilder _textBuilder; readonly LogChunkSequenceBuilder _indexTextBuilder; // Lock object for access to the fields above readonly object _lockObject = new object(); // Inner log device; used for messaging encoding errors. readonly ILogger _logger; /// /// Constructor /// /// Format for data in the log file /// public LogBuilder(LogFormat format, ILogger logger) : this(format, DefaultTextBlockLength, DefaultIndexBlockLength, logger) { } /// /// Constructor /// /// Format of data in the log file /// maximum size for a regular text block /// Maximum size for an index text block /// Logger for conversion errors public LogBuilder(LogFormat format, int maxTextBlockLength, int maxIndexBlockLength, ILogger logger) { _format = format; _textBuilder = new LogChunkSequenceBuilder(maxTextBlockLength); _indexTextBuilder = new LogChunkSequenceBuilder(maxIndexBlockLength); _logger = logger; } /// /// Read data from the unflushed log tail /// /// The first line to read, from the end of the flushed data /// /// public (int LineIdx, ReadOnlyMemory Data) ReadTailData(int firstLineIdx, int maxLength) { lock (_lockObject) { // Clamp the first line index to the first available int flushedLineCount = _root?.LineCount ?? 0; firstLineIdx = Math.Max(firstLineIdx, flushedLineCount); // Measure the size of buffer required for the tail data int length = 0; int lineCount = 0; foreach (Utf8String line in _textBuilder.EnumerateLines(firstLineIdx)) { int nextLength = length + line.Length; if (length > 0 && nextLength > maxLength) { break; } length = nextLength; lineCount++; } // Allocate the buffer byte[] buffer = new byte[length]; // Copy lines into the buffer Span output = buffer.AsSpan(); foreach (Utf8String line in _textBuilder.EnumerateLines(firstLineIdx).Take(lineCount)) { line.Span.CopyTo(output); output = output.Slice(line.Length); } Debug.Assert(output.Length == 0); return (firstLineIdx, buffer); } } /// /// Append JSON data to the end of the log /// /// Log data to append public void WriteData(ReadOnlyMemory data) { lock (_lockObject) { ReadOnlyMemory remaining = data; for (; ; ) { int newlineIdx = remaining.Span.IndexOf((byte)'\n'); if (newlineIdx == -1) { break; } ReadOnlyMemory line = remaining.Slice(0, newlineIdx + 1); _textBuilder.Append(line.Span); _lineCount++; if (_format == LogFormat.Json) { _indexTextBuilder.AppendJsonAsPlainText(line.Span, _logger); } else { _indexTextBuilder.Append(line.Span); } remaining = remaining.Slice(newlineIdx + 1); } } } /// /// Flushes the written data to the log /// /// Writer for the output nodes /// Whether the log is complete /// Cancellation token for the operation public async Task> FlushAsync(IBlobWriter writer, bool complete, CancellationToken cancellationToken) { // Capture the new data that needs to be written IReadOnlyList writeTextChunks; IReadOnlyList writeIndexTextChunks; lock (_lockObject) { _textBuilder.Flush(); writeTextChunks = _textBuilder.Chunks.ToArray(); _indexTextBuilder.Flush(); writeIndexTextChunks = _indexTextBuilder.Chunks.ToArray(); } // Flush any complete chunks to storage LogIndexNode newIndex = await _index.AppendAsync(writer, writeIndexTextChunks, cancellationToken); IHashedBlobRef newIndexRef = await writer.WriteBlobAsync(newIndex, cancellationToken); List newJsonChunkRefs = new List(_root?.TextChunkRefs ?? Array.Empty()); int lineCount = _root?.LineCount ?? 0; long length = _root?.Length ?? 0; foreach (LogChunkNode writeTextChunk in writeTextChunks) { IHashedBlobRef writeTextChunkRef = await writer.WriteBlobAsync(writeTextChunk, cancellationToken); newJsonChunkRefs.Add(new LogChunkRef(lineCount, writeTextChunk.LineCount, length, writeTextChunk.Length, writeTextChunkRef)); lineCount += writeTextChunk.LineCount; length += writeTextChunk.Length; } LogNode newRoot = new LogNode(_format, lineCount, length, newJsonChunkRefs, newIndexRef, complete); IHashedBlobRef newRootRef = await writer.WriteBlobAsync(newRoot, cancellationToken); await writer.FlushAsync(cancellationToken); // Update the new state lock (_lockObject) { _root = newRoot; _index = newIndex; _textBuilder.Remove(writeTextChunks.Count); _indexTextBuilder.Remove(writeIndexTextChunks.Count); } return newRootRef; } } /// /// Extension methods /// public static class LogNodeExtensions { /// /// Reads lines from a line /// /// Log to read from /// Cancellation token /// Sequence of line buffers public static async IAsyncEnumerable> ReadLogAsync(this LogNode logNode, [EnumeratorCancellation] CancellationToken cancellationToken = default) { foreach (LogChunkRef textChunkRef in logNode.TextChunkRefs) { LogChunkNode textChunk = await textChunkRef.Target.ReadBlobAsync(cancellationToken); yield return textChunk.Data; } } /// /// Reads lines from a line /// /// Log to read from /// Zero-based index of the first line to read from /// Cancellation token /// Sequence of line buffers public static async IAsyncEnumerable> ReadLogLinesAsync(this LogNode logNode, int index, [EnumeratorCancellation] CancellationToken cancellationToken = default) { foreach (LogChunkRef textChunkRef in logNode.TextChunkRefs) { int lineIdx = Math.Max(index - textChunkRef.LineIndex, 0); if (lineIdx < textChunkRef.LineCount) { LogChunkNode textChunk = await textChunkRef.Target.ReadBlobAsync(cancellationToken); int offset = textChunk.LineOffsets[lineIdx]; for (; lineIdx < textChunk.LineCount; lineIdx++) { int nextOffset = textChunk.LineOffsets[lineIdx + 1]; ReadOnlyMemory line = textChunk.Data.Slice(offset, nextOffset - offset); yield return line; offset = nextOffset; } } } } } }