Files
UnrealEngine/Engine/Source/Runtime/Experimental/IoStore/OnDemand/Private/OnDemandHttpIoDispatcherBackend.cpp
Brandyn / Techy fcc1b09210 init
2026-04-04 15:40:51 -05:00

392 lines
13 KiB
C++

// Copyright Epic Games, Inc. All Rights Reserved.
#include "OnDemandHttpIoDispatcherBackend.h"
#include "Containers/AnsiString.h"
#include "HAL/PlatformTime.h"
#include "IO/HttpIoDispatcher.h"
#include "IO/IoAllocators.h"
#include "IO/IoOffsetLength.h"
#include "IO/IoStatus.h"
#include "IO/IoStoreOnDemand.h"
#include "OnDemandBackendStatus.h"
#include "OnDemandIoStore.h"
#include "OnDemandHttpThread.h" // EHttpRequestType
#include "Statistics.h"
namespace UE::IoStore
{
////////////////////////////////////////////////////////////////////////////////
extern int32 GIasHttpRangeRequestMinSizeKiB;
extern bool GIasEnableWriteOnlyDecoding;
////////////////////////////////////////////////////////////////////////////////
class FHttpIoDispatcherBackend
: public IOnDemandHttpIoDispatcherBackend
{
using FSharedBackendCtx = TSharedPtr<const FIoDispatcherBackendContext>;
struct FResolvedRequest
{
FResolvedRequest(FIoRequestImpl& InRequest, FOnDemandChunkInfo&& InChunkInfo, const FIoOffsetAndLength& InChunkRange)
: DispatcherRequest(InRequest)
, ChunkInfo(MoveTemp(InChunkInfo))
, ChunkRange(InChunkRange)
, StartTimeCycles(FPlatformTime::Cycles64())
{
check(DispatcherRequest.BackendData == nullptr);
DispatcherRequest.BackendData = this;
}
static FResolvedRequest* Get(FIoRequestImpl& DispatcherRequest)
{
return reinterpret_cast<FResolvedRequest*>(DispatcherRequest.BackendData);
}
static FResolvedRequest* Create(FIoRequestImpl& DispatcherRequest, FOnDemandChunkInfo&& ChunkInfo, const FIoOffsetAndLength& ChunkRange)
{
return Allocator.Construct(DispatcherRequest, MoveTemp(ChunkInfo), ChunkRange);
}
static void Destroy(FResolvedRequest* ResolvedRequest)
{
check(ResolvedRequest->DispatcherRequest.BackendData == ResolvedRequest);
ResolvedRequest->DispatcherRequest.BackendData = nullptr;
Allocator.Destroy(ResolvedRequest);
}
using FAllocator = TSingleThreadedSlabAllocator<FResolvedRequest, 64>;
static FAllocator Allocator;
FIoRequestImpl& DispatcherRequest;
FOnDemandChunkInfo ChunkInfo;
FIoOffsetAndLength ChunkRange;
FIoHttpRequest HttpRequest;
uint64 StartTimeCycles;
};
public:
FHttpIoDispatcherBackend(FOnDemandIoStore& IoStore);
//IOnDemandHttpIoDispatcherBackend
virtual void SetOptionalBulkDataEnabled(bool bEnabled) override;
virtual void ReportAnalytics(TArray<FAnalyticsEventAttribute>& OutAnalyticsArray) const override;
// I/O dispatcher backend
virtual void Initialize(TSharedRef<const FIoDispatcherBackendContext> Context) override;
virtual void Shutdown() override;
virtual void ResolveIoRequests(FIoRequestList Requests, FIoRequestList& OutUnresolved) override;
virtual FIoRequestImpl* GetCompletedIoRequests() override;
virtual void CancelIoRequest(FIoRequestImpl* Request) override;
virtual void UpdatePriorityForIoRequest(FIoRequestImpl* Request) override;
virtual bool DoesChunkExist(const FIoChunkId& ChunkId) const override;
virtual bool DoesChunkExist(const FIoChunkId& ChunkId, const FIoOffsetAndLength& ChunkRange) const override;
virtual TIoStatusOr<uint64> GetSizeForChunk(const FIoChunkId& ChunkId) const override;
virtual TIoStatusOr<FIoMappedRegion> OpenMapped(const FIoChunkId& ChunkId, const FIoReadOptions& Options) override { return FIoStatus(EIoErrorCode::NotFound); }
virtual const TCHAR* GetName() const { return TEXT("HttpIoDispatcherBackend"); }
private:
void CompleteRequest(FResolvedRequest& ResolvedRequst, FIoHttpResponse&& HttpResponse);
static void LogRequest(FResolvedRequest& ResolvedRequest, FIoRequestImpl& DispacherRequest, bool bDecoded, bool bCached, bool bCancelled, uint64 DurationMs);
FBackendStatus BackendStatus;
FOnDemandIoBackendStats Stats;
FOnDemandIoStore& IoStore;
FIoRequestList CompletedRequests;
FMutex Mutex;
FSharedBackendCtx BackendContext;
};
FHttpIoDispatcherBackend::FResolvedRequest::FAllocator FHttpIoDispatcherBackend::FResolvedRequest::Allocator;
////////////////////////////////////////////////////////////////////////////////
FHttpIoDispatcherBackend::FHttpIoDispatcherBackend(FOnDemandIoStore& InIoStore)
: Stats(BackendStatus)
, IoStore(InIoStore)
{
BackendStatus.SetHttpEnabled(true);
BackendStatus.SetCacheEnabled(true);
}
void FHttpIoDispatcherBackend::SetOptionalBulkDataEnabled(bool bEnabled)
{
BackendStatus.SetHttpOptionalBulkEnabled(bEnabled);
}
void FHttpIoDispatcherBackend::ReportAnalytics(TArray<FAnalyticsEventAttribute>& OutAnalyticsArray) const
{
Stats.ReportGeneralAnalytics(OutAnalyticsArray);
Stats.ReportEndPointAnalytics(OutAnalyticsArray);
}
void FHttpIoDispatcherBackend::Initialize(TSharedRef<const FIoDispatcherBackendContext> Context)
{
BackendContext = Context;
}
void FHttpIoDispatcherBackend::Shutdown()
{
BackendContext.Reset();
}
void FHttpIoDispatcherBackend::ResolveIoRequests(FIoRequestList Requests, FIoRequestList& OutUnresolved)
{
const int32 HttpRetryCount = 2;
FIoHttpBatch Batch = FHttpIoDispatcher::NewBatch();
while (FIoRequestImpl* Request = Requests.PopHead())
{
FOnDemandChunkInfo ChunkInfo = IoStore.GetStreamingChunkInfo(Request->ChunkId);
if (!ChunkInfo.IsValid())
{
OutUnresolved.AddTail(Request);
continue;
}
if (FHttpIoDispatcher::IsHostGroupOk(ChunkInfo.HostGroupName()) == false)
{
OutUnresolved.AddTail(Request);
continue;
}
if (BackendStatus.IsHttpEnabled(Request->ChunkId.GetChunkType()) == false)
{
OutUnresolved.AddTail(Request);
continue;
}
const uint64 ResolvedSize = FMath::Min<uint64>(Request->Options.GetSize(), ChunkInfo.RawSize() - Request->Options.GetOffset());
Request->Options.SetRange(Request->Options.GetOffset(), ResolvedSize);
FIoOffsetAndLength ChunkRange;
const uint64 ChunkSize = ChunkInfo.ChunkEntry().GetDiskSize(); // AES aligned encoded size
if (ChunkSize <= (uint64(GIasHttpRangeRequestMinSizeKiB) << 10))
{
ChunkRange = FIoOffsetAndLength(0, ChunkSize);
}
else
{
ChunkRange = FIoChunkEncoding::GetChunkRange(
ChunkInfo.RawSize(),
ChunkInfo.BlockSize(),
ChunkInfo.Blocks(),
Request->Options.GetOffset(),
ResolvedSize).ConsumeValueOrDie();
}
const FIoHttpRange HttpRange = FIoHttpRange::FromOffsetAndLength(ChunkRange);
FResolvedRequest* ResolvedRequest = FResolvedRequest::Create(*Request, MoveTemp(ChunkInfo), ChunkRange);
ResolvedRequest->HttpRequest = Batch.Get(
ResolvedRequest->ChunkInfo.HostGroupName(),
ResolvedRequest->ChunkInfo.RelativeUrl(),
FIoHttpHeaders(),
FIoHttpOptions(Request->Priority, HttpRetryCount, EIoHttpFlags::Default, HttpRange),
ResolvedRequest->ChunkInfo.ChunkEntry().Hash,
[this, ResolvedRequest](FIoHttpResponse&& HttpResponse)
{
// Callbacks are always triggered from the task pool
CompleteRequest(*ResolvedRequest, MoveTemp(HttpResponse));
});
}
Batch.Issue();
}
FIoRequestImpl* FHttpIoDispatcherBackend::GetCompletedIoRequests()
{
FIoRequestList Out;
{
TUniqueLock Lock(Mutex);
Out = MoveTemp(CompletedRequests);
CompletedRequests = FIoRequestList();
}
for (FIoRequestImpl& R : Out)
{
FResolvedRequest::Destroy(reinterpret_cast<FResolvedRequest*>(R.BackendData));
check(R.BackendData == nullptr);
}
return Out.GetHead();
}
void FHttpIoDispatcherBackend::CancelIoRequest(FIoRequestImpl* Request)
{
if (FResolvedRequest* Resolved = FResolvedRequest::Get(*Request))
{
Resolved->HttpRequest.Cancel();
}
}
void FHttpIoDispatcherBackend::UpdatePriorityForIoRequest(FIoRequestImpl* Request)
{
if (FResolvedRequest* Resolved = FResolvedRequest::Get(*Request))
{
Resolved->HttpRequest.UpdatePriorty(Request->Priority);
}
}
bool FHttpIoDispatcherBackend::DoesChunkExist(const FIoChunkId& ChunkId) const
{
const TIoStatusOr<uint64> ChunkSize = GetSizeForChunk(ChunkId);
return ChunkSize.IsOk();
}
bool FHttpIoDispatcherBackend::DoesChunkExist(const FIoChunkId& ChunkId, const FIoOffsetAndLength& ChunkRange) const
{
return DoesChunkExist(ChunkId);
}
TIoStatusOr<uint64> FHttpIoDispatcherBackend::GetSizeForChunk(const FIoChunkId& ChunkId) const
{
const FOnDemandChunkInfo ChunkInfo = IoStore.GetStreamingChunkInfo(ChunkId);
if (!ChunkInfo.IsValid() || !BackendStatus.IsHttpEnabled(ChunkId.GetChunkType()))
{
return FIoStatus(EIoErrorCode::UnknownChunkID);
}
return ChunkInfo.RawSize();
}
void FHttpIoDispatcherBackend::CompleteRequest(FResolvedRequest& ResolvedRequest, FIoHttpResponse&& HttpResponse)
{
FIoBuffer Chunk = HttpResponse.GetBody();
FOnDemandChunkInfo& ChunkInfo = ResolvedRequest.ChunkInfo;
FIoRequestImpl& DispatcherRequest = ResolvedRequest.DispatcherRequest;
const bool bCancelled = DispatcherRequest.IsCancelled();
bool bDecoded = false;
if (!bCancelled && HttpResponse.IsOk() && Chunk.GetSize() > 0)
{
FIoChunkDecodingParams Params;
Params.EncryptionKey = ChunkInfo.EncryptionKey();
Params.CompressionFormat = ChunkInfo.CompressionFormat();
Params.BlockSize = ChunkInfo.BlockSize();
Params.TotalRawSize = ChunkInfo.RawSize();
Params.EncodedBlockSize = ChunkInfo.Blocks();
Params.BlockHash = ChunkInfo.BlockHashes();
Params.EncodedOffset = ResolvedRequest.ChunkRange.GetOffset();
Params.RawOffset = DispatcherRequest.Options.GetOffset();
DispatcherRequest.CreateBuffer(DispatcherRequest.Options.GetSize());
const EIoDecodeFlags Options = GIasEnableWriteOnlyDecoding && EnumHasAnyFlags(DispatcherRequest.Options.GetFlags(), EIoReadOptionsFlags::HardwareTargetBuffer)
? EIoDecodeFlags::WriteOnly : EIoDecodeFlags::None;
bDecoded = FIoChunkEncoding::Decode(Params, Chunk.GetView(), DispatcherRequest.GetBuffer().GetMutableView(), Options);
if (!bDecoded)
{
if (HttpResponse.IsCached())
{
Stats.OnCacheDecodeError();
if (FIoStatus Status = FHttpIoDispatcher::EvictFromCache(HttpResponse); !Status.IsOk())
{
UE_LOG(LogHttpIoDispatcher, Error, TEXT("Evict HTTP cache failed, reason '%s'"), *Status.ToString());
}
}
else
{
Stats.OnHttpDecodeError(EHttpRequestType::Streaming);
}
}
}
const uint64 DurationMs = ResolvedRequest.StartTimeCycles > 0
? uint64(FPlatformTime::ToMilliseconds64(FPlatformTime::Cycles64() - ResolvedRequest.StartTimeCycles))
: 0;
if (bDecoded)
{
if (!HttpResponse.IsCached())
{
FIoStatus Status = FHttpIoDispatcher::CacheResponse(HttpResponse);
UE_CLOG(!Status.IsOk(), LogHttpIoDispatcher, Error, TEXT("Put HTTP cache failed, reason '%s'"), *Status.ToString());
}
Stats.OnIoRequestComplete(DispatcherRequest.GetBuffer().GetSize(), DurationMs);
}
else if (bCancelled)
{
Stats.OnIoRequestCancel();
}
else
{
DispatcherRequest.SetFailed();
Stats.OnIoRequestError();
}
LogRequest(ResolvedRequest, DispatcherRequest, bDecoded, HttpResponse.IsCached(), bCancelled, DurationMs);
{
TUniqueLock Lock(Mutex);
CompletedRequests.AddTail(&DispatcherRequest);
}
BackendContext->WakeUpDispatcherThreadDelegate.Execute();
}
void FHttpIoDispatcherBackend::LogRequest(FResolvedRequest& ResolvedRequest, FIoRequestImpl& DispacherRequest, bool bDecoded, bool bCached, bool bCancelled, uint64 DurationMs)
{
if (bCancelled)
{
return;
}
const TCHAR* Prefix = [bDecoded, bCached]() -> const TCHAR*
{
if (bDecoded)
{
return bCached ? TEXT("io-cache") : TEXT("io-http ");
}
return bCached ? TEXT("io-cache-error") : TEXT("io-http-error ");
}();
auto PrioToString = [](int32 Prio) -> const TCHAR*
{
if (Prio < IoDispatcherPriority_Low)
{
return TEXT("Min");
}
if (Prio < IoDispatcherPriority_Medium)
{
return TEXT("Low");
}
if (Prio < IoDispatcherPriority_High)
{
return TEXT("Medium");
}
if (Prio < IoDispatcherPriority_Max)
{
return TEXT("High");
}
return TEXT("Max");
};
FOnDemandChunkInfo& ChunkInfo = ResolvedRequest.ChunkInfo;
const uint64 UncompressedSize = bDecoded ? DispacherRequest.GetBuffer().GetSize() : 0;
const FIoOffsetAndLength& ChunkRange = ResolvedRequest.ChunkRange;
const uint64 ChunkSize = ChunkInfo.ChunkEntry().GetDiskSize();
UE_LOG(LogIas, VeryVerbose, TEXT("%s: %5" UINT64_FMT "ms %5" UINT64_FMT "KiB[%8" UINT64_FMT "] % s: % s | Range: %" UINT64_FMT "-%" UINT64_FMT "/%" UINT64_FMT " (%.2f%%) | Prio: %s"),
Prefix,
DurationMs,
UncompressedSize >> 10,
DispacherRequest.Options.GetOffset(),
*LexToString(DispacherRequest.ChunkId),
*LexToString(ChunkInfo.Hash()),
ChunkRange.GetOffset(), (ChunkRange.GetOffset() + ChunkRange.GetLength() - 1), ChunkSize,
100.0f * (float(ChunkRange.GetLength()) / float(ChunkSize)),
PrioToString(DispacherRequest.Priority));
}
////////////////////////////////////////////////////////////////////////////////
TSharedPtr<IOnDemandHttpIoDispatcherBackend> MakeOnDemandHttpIoDispatcherBackend(FOnDemandIoStore& IoStore)
{
return TSharedPtr<IOnDemandHttpIoDispatcherBackend>(new FHttpIoDispatcherBackend(IoStore));
}
} // namespace UE::IoStore