Files
UnrealEngine/Engine/Source/ThirdParty/Blosc/c-blosc-1.21.0/blosc/blosc.c
Brandyn / Techy fcc1b09210 init
2026-04-04 15:40:51 -05:00

2308 lines
68 KiB
C

/*********************************************************************
Blosc - Blocked Shuffling and Compression Library
Author: Francesc Alted <francesc@blosc.org>
Creation date: 2009-05-20
See LICENSES/BLOSC.txt for details about copyright and rights to use.
**********************************************************************/
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <assert.h>
#include "fastcopy.h"
#if defined(USING_CMAKE)
#include "config.h"
#endif /* USING_CMAKE */
#include "blosc.h"
#include "shuffle.h"
#include "blosclz.h"
#if defined(HAVE_LZ4)
#include "lz4.h"
#include "lz4hc.h"
#endif /* HAVE_LZ4 */
#if defined(HAVE_SNAPPY)
#include "snappy-c.h"
#endif /* HAVE_SNAPPY */
#if defined(HAVE_ZLIB)
#include "zlib.h"
#endif /* HAVE_ZLIB */
#if defined(HAVE_ZSTD)
#include "zstd.h"
#endif /* HAVE_ZSTD */
#if defined(_WIN32) && !defined(__MINGW32__)
#include <windows.h>
#include <malloc.h>
/* stdint.h only available in VS2010 (VC++ 16.0) and newer */
#if defined(_MSC_VER) && _MSC_VER < 1600
#include "win32/stdint-windows.h"
#else
#include <stdint.h>
#endif
#include <process.h>
#define getpid _getpid
#else
#include <stdint.h>
#include <unistd.h>
#include <inttypes.h>
#endif /* _WIN32 */
/* Include the win32/pthread.h library for all the Windows builds. See #224. */
#if defined(_WIN32)
#include "win32/pthread.h"
#include "win32/pthread.c"
#else
#include <pthread.h>
#endif
/* Some useful units */
#define KB 1024
#define MB (1024 * (KB))
/* Minimum buffer size to be compressed */
#define MIN_BUFFERSIZE 128 /* Cannot be smaller than 66 */
/* The maximum number of splits in a block for compression */
#define MAX_SPLITS 16 /* Cannot be larger than 128 */
/* The size of L1 cache. 32 KB is quite common nowadays. */
#define L1 (32 * (KB))
/* Have problems using posix barriers when symbol value is 200112L */
/* This requires more investigation, but will work for the moment */
#if defined(_POSIX_BARRIERS) && ( (_POSIX_BARRIERS - 20012L) >= 0 && _POSIX_BARRIERS != 200112L)
#define _POSIX_BARRIERS_MINE
#endif
/* Synchronization variables */
struct blosc_context {
int32_t compress; /* 1 if we are doing compression 0 if decompress */
const uint8_t* src;
uint8_t* dest; /* The current pos in the destination buffer */
uint8_t* header_flags; /* Flags for header */
int compversion; /* Compressor version byte, only used during decompression */
int32_t sourcesize; /* Number of bytes in source buffer (or uncompressed bytes in compressed file) */
int32_t compressedsize; /* Number of bytes of compressed data (only used when decompressing) */
int32_t nblocks; /* Number of total blocks in buffer */
int32_t leftover; /* Extra bytes at end of buffer */
int32_t blocksize; /* Length of the block in bytes */
int32_t typesize; /* Type size */
int32_t num_output_bytes; /* Counter for the number of output bytes */
int32_t destsize; /* Maximum size for destination buffer */
uint8_t* bstarts; /* Start of the buffer past header info */
int32_t compcode; /* Compressor code to use */
int clevel; /* Compression level (1-9) */
/* Function to use for decompression. Only used when decompression */
int (*decompress_func)(const void* input, int compressed_length, void* output,
int maxout);
/* Threading */
int32_t numthreads;
int32_t threads_started;
int32_t end_threads;
pthread_t threads[BLOSC_MAX_THREADS];
int32_t tids[BLOSC_MAX_THREADS];
pthread_mutex_t count_mutex;
#ifdef _POSIX_BARRIERS_MINE
pthread_barrier_t barr_init;
pthread_barrier_t barr_finish;
#else
int32_t count_threads;
pthread_mutex_t count_threads_mutex;
pthread_cond_t count_threads_cv;
#endif
#if !defined(_WIN32)
pthread_attr_t ct_attr; /* creation time attrs for threads */
#endif
int32_t thread_giveup_code; /* error code when give up */
int32_t thread_nblock; /* block counter */
};
struct thread_context {
struct blosc_context* parent_context;
int32_t tid;
uint8_t* tmp;
uint8_t* tmp2;
uint8_t* tmp3;
int32_t tmpblocksize; /* Used to keep track of how big the temporary buffers are */
};
/* Global context for non-contextual API */
static struct blosc_context* g_global_context;
static pthread_mutex_t* global_comp_mutex;
static int32_t g_compressor = BLOSC_BLOSCLZ; /* the compressor to use by default */
static int32_t g_threads = 1;
static int32_t g_force_blocksize = 0;
static int32_t g_initlib = 0;
static int32_t g_atfork_registered = 0;
static int32_t g_splitmode = BLOSC_FORWARD_COMPAT_SPLIT;
/* Wrapped function to adjust the number of threads used by blosc */
int blosc_set_nthreads_(struct blosc_context*);
/* Releases the global threadpool */
int blosc_release_threadpool(struct blosc_context* context);
/* Macros for synchronization */
/* Wait until all threads are initialized */
#ifdef _POSIX_BARRIERS_MINE
#define WAIT_INIT(RET_VAL, CONTEXT_PTR) \
rc = pthread_barrier_wait(&CONTEXT_PTR->barr_init); \
if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) { \
printf("Could not wait on barrier (init): %d\n", rc); \
return((RET_VAL)); \
}
#else
#define WAIT_INIT(RET_VAL, CONTEXT_PTR) \
pthread_mutex_lock(&CONTEXT_PTR->count_threads_mutex); \
if (CONTEXT_PTR->count_threads < CONTEXT_PTR->numthreads) { \
CONTEXT_PTR->count_threads++; \
pthread_cond_wait(&CONTEXT_PTR->count_threads_cv, &CONTEXT_PTR->count_threads_mutex); \
} \
else { \
pthread_cond_broadcast(&CONTEXT_PTR->count_threads_cv); \
} \
pthread_mutex_unlock(&CONTEXT_PTR->count_threads_mutex);
#endif
/* Wait for all threads to finish */
#ifdef _POSIX_BARRIERS_MINE
#define WAIT_FINISH(RET_VAL, CONTEXT_PTR) \
rc = pthread_barrier_wait(&CONTEXT_PTR->barr_finish); \
if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) { \
printf("Could not wait on barrier (finish)\n"); \
return((RET_VAL)); \
}
#else
#define WAIT_FINISH(RET_VAL, CONTEXT_PTR) \
pthread_mutex_lock(&CONTEXT_PTR->count_threads_mutex); \
if (CONTEXT_PTR->count_threads > 0) { \
CONTEXT_PTR->count_threads--; \
pthread_cond_wait(&CONTEXT_PTR->count_threads_cv, &CONTEXT_PTR->count_threads_mutex); \
} \
else { \
pthread_cond_broadcast(&CONTEXT_PTR->count_threads_cv); \
} \
pthread_mutex_unlock(&CONTEXT_PTR->count_threads_mutex);
#endif
/* A function for aligned malloc that is portable */
static uint8_t *my_malloc(size_t size)
{
void *block = NULL;
int res = 0;
/* Do an alignment to 32 bytes because AVX2 is supported */
#if defined(_WIN32)
/* A (void *) cast needed for avoiding a warning with MINGW :-/ */
block = (void *)_aligned_malloc(size, 32);
#elif _POSIX_C_SOURCE >= 200112L || _XOPEN_SOURCE >= 600
/* Platform does have an implementation of posix_memalign */
res = posix_memalign(&block, 32, size);
#else
block = malloc(size);
#endif /* _WIN32 */
if (block == NULL || res != 0) {
printf("Error allocating memory!");
return NULL;
}
return (uint8_t *)block;
}
/* Release memory booked by my_malloc */
static void my_free(void *block)
{
#if defined(_WIN32)
_aligned_free(block);
#else
free(block);
#endif /* _WIN32 */
}
/* Copy 4 bytes from `*pa` to int32_t, changing endianness if necessary. */
static int32_t sw32_(const uint8_t *pa)
{
int32_t idest;
uint8_t *dest = (uint8_t *)&idest;
int i = 1; /* for big/little endian detection */
char *p = (char *)&i;
if (p[0] != 1) {
/* big endian */
dest[0] = pa[3];
dest[1] = pa[2];
dest[2] = pa[1];
dest[3] = pa[0];
}
else {
/* little endian */
dest[0] = pa[0];
dest[1] = pa[1];
dest[2] = pa[2];
dest[3] = pa[3];
}
return idest;
}
/* Copy 4 bytes from `*pa` to `*dest`, changing endianness if necessary. */
static void _sw32(uint8_t* dest, int32_t a)
{
uint8_t *pa = (uint8_t *)&a;
int i = 1; /* for big/little endian detection */
char *p = (char *)&i;
if (p[0] != 1) {
/* big endian */
dest[0] = pa[3];
dest[1] = pa[2];
dest[2] = pa[1];
dest[3] = pa[0];
}
else {
/* little endian */
dest[0] = pa[0];
dest[1] = pa[1];
dest[2] = pa[2];
dest[3] = pa[3];
}
}
/*
* Conversion routines between compressor and compression libraries
*/
/* Return the library code associated with the compressor name */
static int compname_to_clibcode(const char *compname)
{
if (strcmp(compname, BLOSC_BLOSCLZ_COMPNAME) == 0)
return BLOSC_BLOSCLZ_LIB;
if (strcmp(compname, BLOSC_LZ4_COMPNAME) == 0)
return BLOSC_LZ4_LIB;
if (strcmp(compname, BLOSC_LZ4HC_COMPNAME) == 0)
return BLOSC_LZ4_LIB;
if (strcmp(compname, BLOSC_SNAPPY_COMPNAME) == 0)
return BLOSC_SNAPPY_LIB;
if (strcmp(compname, BLOSC_ZLIB_COMPNAME) == 0)
return BLOSC_ZLIB_LIB;
if (strcmp(compname, BLOSC_ZSTD_COMPNAME) == 0)
return BLOSC_ZSTD_LIB;
return -1;
}
/* Return the library name associated with the compressor code */
static const char *clibcode_to_clibname(int clibcode)
{
if (clibcode == BLOSC_BLOSCLZ_LIB) return BLOSC_BLOSCLZ_LIBNAME;
if (clibcode == BLOSC_LZ4_LIB) return BLOSC_LZ4_LIBNAME;
if (clibcode == BLOSC_SNAPPY_LIB) return BLOSC_SNAPPY_LIBNAME;
if (clibcode == BLOSC_ZLIB_LIB) return BLOSC_ZLIB_LIBNAME;
if (clibcode == BLOSC_ZSTD_LIB) return BLOSC_ZSTD_LIBNAME;
return NULL; /* should never happen */
}
/*
* Conversion routines between compressor names and compressor codes
*/
/* Get the compressor name associated with the compressor code */
int blosc_compcode_to_compname(int compcode, const char **compname)
{
int code = -1; /* -1 means non-existent compressor code */
const char *name = NULL;
/* Map the compressor code */
if (compcode == BLOSC_BLOSCLZ)
name = BLOSC_BLOSCLZ_COMPNAME;
else if (compcode == BLOSC_LZ4)
name = BLOSC_LZ4_COMPNAME;
else if (compcode == BLOSC_LZ4HC)
name = BLOSC_LZ4HC_COMPNAME;
else if (compcode == BLOSC_SNAPPY)
name = BLOSC_SNAPPY_COMPNAME;
else if (compcode == BLOSC_ZLIB)
name = BLOSC_ZLIB_COMPNAME;
else if (compcode == BLOSC_ZSTD)
name = BLOSC_ZSTD_COMPNAME;
*compname = name;
/* Guess if there is support for this code */
if (compcode == BLOSC_BLOSCLZ)
code = BLOSC_BLOSCLZ;
#if defined(HAVE_LZ4)
else if (compcode == BLOSC_LZ4)
code = BLOSC_LZ4;
else if (compcode == BLOSC_LZ4HC)
code = BLOSC_LZ4HC;
#endif /* HAVE_LZ4 */
#if defined(HAVE_SNAPPY)
else if (compcode == BLOSC_SNAPPY)
code = BLOSC_SNAPPY;
#endif /* HAVE_SNAPPY */
#if defined(HAVE_ZLIB)
else if (compcode == BLOSC_ZLIB)
code = BLOSC_ZLIB;
#endif /* HAVE_ZLIB */
#if defined(HAVE_ZSTD)
else if (compcode == BLOSC_ZSTD)
code = BLOSC_ZSTD;
#endif /* HAVE_ZSTD */
return code;
}
/* Get the compressor code for the compressor name. -1 if it is not available */
int blosc_compname_to_compcode(const char *compname)
{
int code = -1; /* -1 means non-existent compressor code */
if (strcmp(compname, BLOSC_BLOSCLZ_COMPNAME) == 0) {
code = BLOSC_BLOSCLZ;
}
#if defined(HAVE_LZ4)
else if (strcmp(compname, BLOSC_LZ4_COMPNAME) == 0) {
code = BLOSC_LZ4;
}
else if (strcmp(compname, BLOSC_LZ4HC_COMPNAME) == 0) {
code = BLOSC_LZ4HC;
}
#endif /* HAVE_LZ4 */
#if defined(HAVE_SNAPPY)
else if (strcmp(compname, BLOSC_SNAPPY_COMPNAME) == 0) {
code = BLOSC_SNAPPY;
}
#endif /* HAVE_SNAPPY */
#if defined(HAVE_ZLIB)
else if (strcmp(compname, BLOSC_ZLIB_COMPNAME) == 0) {
code = BLOSC_ZLIB;
}
#endif /* HAVE_ZLIB */
#if defined(HAVE_ZSTD)
else if (strcmp(compname, BLOSC_ZSTD_COMPNAME) == 0) {
code = BLOSC_ZSTD;
}
#endif /* HAVE_ZSTD */
return code;
}
#if defined(HAVE_LZ4)
static int lz4_wrap_compress(const char* input, size_t input_length,
char* output, size_t maxout, int accel)
{
int cbytes;
cbytes = LZ4_compress_fast(input, output, (int)input_length, (int)maxout,
accel);
return cbytes;
}
static int lz4hc_wrap_compress(const char* input, size_t input_length,
char* output, size_t maxout, int clevel)
{
int cbytes;
if (input_length > (size_t)(UINT32_C(2)<<30))
return -1; /* input larger than 2 GB is not supported */
/* clevel for lz4hc goes up to 12, at least in LZ4 1.7.5
* but levels larger than 9 do not buy much compression. */
cbytes = LZ4_compress_HC(input, output, (int)input_length, (int)maxout,
clevel);
return cbytes;
}
static int lz4_wrap_decompress(const void* input, int compressed_length,
void* output, int maxout)
{
return LZ4_decompress_safe(input, output, compressed_length, maxout);
}
#endif /* HAVE_LZ4 */
#if defined(HAVE_SNAPPY)
static int snappy_wrap_compress(const char* input, size_t input_length,
char* output, size_t maxout)
{
snappy_status status;
size_t cl = maxout;
status = snappy_compress(input, input_length, output, &cl);
if (status != SNAPPY_OK){
return 0;
}
return (int)cl;
}
static int snappy_wrap_decompress(const void* input, int compressed_length,
void* output, int maxout)
{
snappy_status status;
size_t ul = maxout;
status = snappy_uncompress(input, compressed_length, output, &ul);
if (status != SNAPPY_OK){
return 0;
}
return (int)ul;
}
#endif /* HAVE_SNAPPY */
#if defined(HAVE_ZLIB)
/* zlib is not very respectful with sharing name space with others.
Fortunately, its names do not collide with those already in blosc. */
static int zlib_wrap_compress(const char* input, size_t input_length,
char* output, size_t maxout, int clevel)
{
int status;
uLongf cl = maxout;
status = compress2(
(Bytef*)output, &cl, (Bytef*)input, (uLong)input_length, clevel);
if (status != Z_OK){
return 0;
}
return (int)cl;
}
static int zlib_wrap_decompress(const void* input, int compressed_length,
void* output, int maxout) {
int status;
uLongf ul = maxout;
status = uncompress(
(Bytef*)output, &ul, (Bytef*)input, (uLong)compressed_length);
if (status != Z_OK){
return 0;
}
return (int)ul;
}
#endif /* HAVE_ZLIB */
#if defined(HAVE_ZSTD)
static int zstd_wrap_compress(const char* input, size_t input_length,
char* output, size_t maxout, int clevel) {
size_t code;
clevel = (clevel < 9) ? clevel * 2 - 1 : ZSTD_maxCLevel();
/* Make the level 8 close enough to maxCLevel */
if (clevel == 8) clevel = ZSTD_maxCLevel() - 2;
code = ZSTD_compress(
(void*)output, maxout, (void*)input, input_length, clevel);
if (ZSTD_isError(code)) {
return 0;
}
return (int)code;
}
static int zstd_wrap_decompress(const void* input, int compressed_length,
void* output, int maxout) {
size_t code;
code = ZSTD_decompress(
(void*)output, maxout, (void*)input, compressed_length);
if (ZSTD_isError(code)) {
return 0;
}
return (int)code;
}
#endif /* HAVE_ZSTD */
static int initialize_decompress_func(struct blosc_context* context) {
int8_t header_flags = *(context->header_flags);
int32_t compformat = (header_flags & 0xe0) >> 5;
int compversion = context->compversion;
if (compformat == BLOSC_BLOSCLZ_FORMAT) {
if (compversion != BLOSC_BLOSCLZ_VERSION_FORMAT) {
return -9;
}
context->decompress_func = &blosclz_decompress;
return 0;
}
#if defined(HAVE_LZ4)
if (compformat == BLOSC_LZ4_FORMAT) {
if (compversion != BLOSC_LZ4_VERSION_FORMAT) {
return -9;
}
context->decompress_func = &lz4_wrap_decompress;
return 0;
}
#endif /* HAVE_LZ4 */
#if defined(HAVE_SNAPPY)
if (compformat == BLOSC_SNAPPY_FORMAT) {
if (compversion != BLOSC_SNAPPY_VERSION_FORMAT) {
return -9;
}
context->decompress_func = &snappy_wrap_decompress;
return 0;
}
#endif /* HAVE_SNAPPY */
#if defined(HAVE_ZLIB)
if (compformat == BLOSC_ZLIB_FORMAT) {
if (compversion != BLOSC_ZLIB_VERSION_FORMAT) {
return -9;
}
context->decompress_func = &zlib_wrap_decompress;
return 0;
}
#endif /* HAVE_ZLIB */
#if defined(HAVE_ZSTD)
if (compformat == BLOSC_ZSTD_FORMAT) {
if (compversion != BLOSC_ZSTD_VERSION_FORMAT) {
return -9;
}
context->decompress_func = &zstd_wrap_decompress;
return 0;
}
#endif /* HAVE_ZSTD */
return -5; /* signals no decompression support */
}
/* Compute acceleration for blosclz */
static int get_accel(const struct blosc_context* context) {
int32_t clevel = context->clevel;
if (context->compcode == BLOSC_LZ4) {
/* This acceleration setting based on discussions held in:
* https://groups.google.com/forum/#!topic/lz4c/zosy90P8MQw
*/
return (10 - clevel);
}
return 1;
}
/* Shuffle & compress a single block */
static int blosc_c(const struct blosc_context* context, int32_t blocksize,
int32_t leftoverblock, int32_t ntbytes, int32_t maxbytes,
const uint8_t *src, uint8_t *dest, uint8_t *tmp,
uint8_t *tmp2)
{
int8_t header_flags = *(context->header_flags);
int dont_split = (header_flags & 0x10) >> 4;
int32_t j, neblock, nsplits;
int32_t cbytes; /* number of compressed bytes in split */
int32_t ctbytes = 0; /* number of compressed bytes in block */
int32_t maxout;
int32_t typesize = context->typesize;
const uint8_t *_tmp = src;
const char *compname;
int accel;
int bscount;
int doshuffle = (header_flags & BLOSC_DOSHUFFLE) && (typesize > 1);
int dobitshuffle = ((header_flags & BLOSC_DOBITSHUFFLE) &&
(blocksize >= typesize));
if (doshuffle) {
/* Byte shuffling only makes sense if typesize > 1 */
blosc_internal_shuffle(typesize, blocksize, src, tmp);
_tmp = tmp;
}
/* We don't allow more than 1 filter at the same time (yet) */
else if (dobitshuffle) {
bscount = blosc_internal_bitshuffle(typesize, blocksize, src, tmp, tmp2);
if (bscount < 0)
return bscount;
_tmp = tmp;
}
/* Calculate acceleration for different compressors */
accel = get_accel(context);
/* The number of splits for this block */
if (!dont_split && !leftoverblock) {
nsplits = typesize;
}
else {
nsplits = 1;
}
neblock = blocksize / nsplits;
for (j = 0; j < nsplits; j++) {
dest += sizeof(int32_t);
ntbytes += (int32_t)sizeof(int32_t);
ctbytes += (int32_t)sizeof(int32_t);
maxout = neblock;
#if defined(HAVE_SNAPPY)
if (context->compcode == BLOSC_SNAPPY) {
/* TODO perhaps refactor this to keep the value stashed somewhere */
maxout = snappy_max_compressed_length(neblock);
}
#endif /* HAVE_SNAPPY */
if (ntbytes+maxout > maxbytes) {
maxout = maxbytes - ntbytes; /* avoid buffer overrun */
if (maxout <= 0) {
return 0; /* non-compressible block */
}
}
if (context->compcode == BLOSC_BLOSCLZ) {
cbytes = blosclz_compress(context->clevel, _tmp+j*neblock, neblock,
dest, maxout);
}
#if defined(HAVE_LZ4)
else if (context->compcode == BLOSC_LZ4) {
cbytes = lz4_wrap_compress((char *)_tmp+j*neblock, (size_t)neblock,
(char *)dest, (size_t)maxout, accel);
}
else if (context->compcode == BLOSC_LZ4HC) {
cbytes = lz4hc_wrap_compress((char *)_tmp+j*neblock, (size_t)neblock,
(char *)dest, (size_t)maxout,
context->clevel);
}
#endif /* HAVE_LZ4 */
#if defined(HAVE_SNAPPY)
else if (context->compcode == BLOSC_SNAPPY) {
cbytes = snappy_wrap_compress((char *)_tmp+j*neblock, (size_t)neblock,
(char *)dest, (size_t)maxout);
}
#endif /* HAVE_SNAPPY */
#if defined(HAVE_ZLIB)
else if (context->compcode == BLOSC_ZLIB) {
cbytes = zlib_wrap_compress((char *)_tmp+j*neblock, (size_t)neblock,
(char *)dest, (size_t)maxout,
context->clevel);
}
#endif /* HAVE_ZLIB */
#if defined(HAVE_ZSTD)
else if (context->compcode == BLOSC_ZSTD) {
cbytes = zstd_wrap_compress((char*)_tmp + j * neblock, (size_t)neblock,
(char*)dest, (size_t)maxout, context->clevel);
}
#endif /* HAVE_ZSTD */
else {
blosc_compcode_to_compname(context->compcode, &compname);
if (compname == NULL) {
compname = "(null)";
}
fprintf(stderr, "Blosc has not been compiled with '%s' ", compname);
fprintf(stderr, "compression support. Please use one having it.");
return -5; /* signals no compression support */
}
if (cbytes > maxout) {
/* Buffer overrun caused by compression (should never happen) */
return -1;
}
else if (cbytes < 0) {
/* cbytes should never be negative */
return -2;
}
else if (cbytes == 0 || cbytes == neblock) {
/* The compressor has been unable to compress data at all. */
/* Before doing the copy, check that we are not running into a
buffer overflow. */
if ((ntbytes+neblock) > maxbytes) {
return 0; /* Non-compressible data */
}
fastcopy(dest, _tmp + j * neblock, neblock);
cbytes = neblock;
}
_sw32(dest - 4, cbytes);
dest += cbytes;
ntbytes += cbytes;
ctbytes += cbytes;
} /* Closes j < nsplits */
return ctbytes;
}
/* Decompress & unshuffle a single block */
static int blosc_d(struct blosc_context* context, int32_t blocksize,
int32_t leftoverblock, const uint8_t* base_src,
int32_t src_offset, uint8_t* dest, uint8_t* tmp,
uint8_t* tmp2) {
int8_t header_flags = *(context->header_flags);
int dont_split = (header_flags & 0x10) >> 4;
int32_t j, neblock, nsplits;
int32_t nbytes; /* number of decompressed bytes in split */
const int32_t compressedsize = context->compressedsize;
int32_t cbytes; /* number of compressed bytes in split */
int32_t ctbytes = 0; /* number of compressed bytes in block */
int32_t ntbytes = 0; /* number of uncompressed bytes in block */
uint8_t *_tmp = dest;
int32_t typesize = context->typesize;
int bscount;
int doshuffle = (header_flags & BLOSC_DOSHUFFLE) && (typesize > 1);
int dobitshuffle = ((header_flags & BLOSC_DOBITSHUFFLE) &&
(blocksize >= typesize));
const uint8_t* src;
if (doshuffle || dobitshuffle) {
_tmp = tmp;
}
/* The number of splits for this block */
if (!dont_split &&
/* For compatibility with before the introduction of the split flag */
((typesize <= MAX_SPLITS) && (blocksize/typesize) >= MIN_BUFFERSIZE) &&
!leftoverblock) {
nsplits = typesize;
}
else {
nsplits = 1;
}
neblock = blocksize / nsplits;
for (j = 0; j < nsplits; j++) {
/* Validate src_offset */
if (src_offset < 0 || src_offset > compressedsize - sizeof(int32_t)) {
return -1;
}
cbytes = sw32_(base_src + src_offset); /* amount of compressed bytes */
src_offset += sizeof(int32_t);
/* Validate cbytes */
if (cbytes < 0 || cbytes > context->compressedsize - src_offset) {
return -1;
}
ctbytes += (int32_t)sizeof(int32_t);
src = base_src + src_offset;
/* Uncompress */
if (cbytes == neblock) {
fastcopy(_tmp, src, neblock);
nbytes = neblock;
}
else {
nbytes = context->decompress_func(src, cbytes, _tmp, neblock);
/* Check that decompressed bytes number is correct */
if (nbytes != neblock) {
return -2;
}
}
src_offset += cbytes;
ctbytes += cbytes;
_tmp += nbytes;
ntbytes += nbytes;
} /* Closes j < nsplits */
if (doshuffle) {
blosc_internal_unshuffle(typesize, blocksize, tmp, dest);
}
else if (dobitshuffle) {
bscount = blosc_internal_bitunshuffle(typesize, blocksize, tmp, dest, tmp2);
if (bscount < 0)
return bscount;
}
/* Return the number of uncompressed bytes */
return ntbytes;
}
/* Serial version for compression/decompression */
static int serial_blosc(struct blosc_context* context)
{
int32_t j, bsize, leftoverblock;
int32_t cbytes;
int32_t ebsize = context->blocksize + context->typesize * (int32_t)sizeof(int32_t);
int32_t ntbytes = context->num_output_bytes;
uint8_t *tmp = my_malloc(context->blocksize + ebsize);
uint8_t *tmp2 = tmp + context->blocksize;
for (j = 0; j < context->nblocks; j++) {
if (context->compress && !(*(context->header_flags) & BLOSC_MEMCPYED)) {
_sw32(context->bstarts + j * 4, ntbytes);
}
bsize = context->blocksize;
leftoverblock = 0;
if ((j == context->nblocks - 1) && (context->leftover > 0)) {
bsize = context->leftover;
leftoverblock = 1;
}
if (context->compress) {
if (*(context->header_flags) & BLOSC_MEMCPYED) {
/* We want to memcpy only */
fastcopy(context->dest + BLOSC_MAX_OVERHEAD + j * context->blocksize,
context->src + j * context->blocksize, bsize);
cbytes = bsize;
}
else {
/* Regular compression */
cbytes = blosc_c(context, bsize, leftoverblock, ntbytes,
context->destsize, context->src+j*context->blocksize,
context->dest+ntbytes, tmp, tmp2);
if (cbytes == 0) {
ntbytes = 0; /* uncompressible data */
break;
}
}
}
else {
if (*(context->header_flags) & BLOSC_MEMCPYED) {
/* We want to memcpy only */
fastcopy(context->dest + j * context->blocksize,
context->src + BLOSC_MAX_OVERHEAD + j * context->blocksize, bsize);
cbytes = bsize;
}
else {
/* Regular decompression */
cbytes = blosc_d(context, bsize, leftoverblock, context->src,
sw32_(context->bstarts + j * 4),
context->dest + j * context->blocksize, tmp, tmp2);
}
}
if (cbytes < 0) {
ntbytes = cbytes; /* error in blosc_c or blosc_d */
break;
}
ntbytes += cbytes;
}
/* Free temporaries */
my_free(tmp);
return ntbytes;
}
/* Threaded version for compression/decompression */
static int parallel_blosc(struct blosc_context* context)
{
int rc;
(void)rc; // just to avoid 'unused-variable' warning
/* Check whether we need to restart threads */
if (blosc_set_nthreads_(context) < 0) {
return -1;
}
/* Set sentinels */
context->thread_giveup_code = 1;
context->thread_nblock = -1;
/* Synchronization point for all threads (wait for initialization) */
WAIT_INIT(-1, context);
/* Synchronization point for all threads (wait for finalization) */
WAIT_FINISH(-1, context);
if (context->thread_giveup_code > 0) {
/* Return the total bytes (de-)compressed in threads */
return context->num_output_bytes;
}
else {
/* Compression/decompression gave up. Return error code. */
return context->thread_giveup_code;
}
}
/* Do the compression or decompression of the buffer depending on the
global params. */
static int do_job(struct blosc_context* context)
{
int32_t ntbytes;
/* Run the serial version when nthreads is 1 or when the buffers are
not much larger than blocksize */
if (context->numthreads == 1 || (context->sourcesize / context->blocksize) <= 1) {
ntbytes = serial_blosc(context);
}
else {
ntbytes = parallel_blosc(context);
}
return ntbytes;
}
/* Whether a codec is meant for High Compression Ratios */
#define HCR(codec) ( \
((codec) == BLOSC_LZ4HC) || \
((codec) == BLOSC_ZLIB) || \
((codec) == BLOSC_ZSTD) ? 1 : 0 )
/* Conditions for splitting a block before compressing with a codec. */
static int split_block(int compcode, int typesize, int blocksize) {
int splitblock = -1;
switch (g_splitmode) {
case BLOSC_ALWAYS_SPLIT:
splitblock = 1;
break;
case BLOSC_NEVER_SPLIT:
splitblock = 0;
break;
case BLOSC_AUTO_SPLIT:
/* Normally all the compressors designed for speed benefit from a
split. However, in conducted benchmarks LZ4 seems that it runs
faster if we don't split, which is quite surprising. */
splitblock= (((compcode == BLOSC_BLOSCLZ) ||
(compcode == BLOSC_SNAPPY)) &&
(typesize <= MAX_SPLITS) &&
(blocksize / typesize) >= MIN_BUFFERSIZE);
break;
case BLOSC_FORWARD_COMPAT_SPLIT:
/* The zstd support was introduced at the same time than the split flag, so
* there should be not a problem with not splitting bloscks with it */
splitblock = ((compcode != BLOSC_ZSTD) &&
(typesize <= MAX_SPLITS) &&
(blocksize / typesize) >= MIN_BUFFERSIZE);
break;
default:
fprintf(stderr, "Split mode %d not supported", g_splitmode);
}
return splitblock;
}
static int32_t compute_blocksize(struct blosc_context* context, int32_t clevel,
int32_t typesize, int32_t nbytes,
int32_t forced_blocksize)
{
int32_t blocksize;
/* Protection against very small buffers */
if (nbytes < (int32_t)typesize) {
return 1;
}
blocksize = nbytes; /* Start by a whole buffer as blocksize */
if (forced_blocksize) {
blocksize = forced_blocksize;
/* Check that forced blocksize is not too small */
if (blocksize < MIN_BUFFERSIZE) {
blocksize = MIN_BUFFERSIZE;
}
/* Check that forced blocksize is not too large */
if (blocksize > BLOSC_MAX_BLOCKSIZE) {
blocksize = BLOSC_MAX_BLOCKSIZE;
}
}
else if (nbytes >= L1) {
blocksize = L1;
/* For HCR codecs, increase the block sizes by a factor of 2 because they
are meant for compressing large blocks (i.e. they show a big overhead
when compressing small ones). */
if (HCR(context->compcode)) {
blocksize *= 2;
}
switch (clevel) {
case 0:
/* Case of plain copy */
blocksize /= 4;
break;
case 1:
blocksize /= 2;
break;
case 2:
blocksize *= 1;
break;
case 3:
blocksize *= 2;
break;
case 4:
case 5:
blocksize *= 4;
break;
case 6:
case 7:
case 8:
blocksize *= 8;
break;
case 9:
blocksize *= 8;
if (HCR(context->compcode)) {
blocksize *= 2;
}
break;
default:
assert(0);
break;
}
}
/* Enlarge the blocksize for splittable codecs */
if (clevel > 0 && split_block(context->compcode, typesize, blocksize)) {
if (blocksize > (1 << 18)) {
/* Do not use a too large split buffer (> 256 KB) for splitting codecs */
blocksize = (1 << 18);
}
blocksize *= typesize;
if (blocksize < (1 << 16)) {
/* Do not use a too small blocksize (< 64 KB) when typesize is small */
blocksize = (1 << 16);
}
if (blocksize > 1024 * 1024) {
/* But do not exceed 1 MB per thread (having this capacity in L3 is normal in modern CPUs) */
blocksize = 1024 * 1024;
}
}
/* Check that blocksize is not too large */
if (blocksize > (int32_t)nbytes) {
blocksize = nbytes;
}
/* blocksize *must absolutely* be a multiple of the typesize */
if (blocksize > typesize) {
blocksize = blocksize / typesize * typesize;
}
return blocksize;
}
static int initialize_context_compression(struct blosc_context* context,
int clevel,
int doshuffle,
size_t typesize,
size_t sourcesize,
const void* src,
void* dest,
size_t destsize,
int32_t compressor,
int32_t blocksize,
int32_t numthreads)
{
char *envvar = NULL;
int warnlvl = 0;
/* Set parameters */
context->compress = 1;
context->src = (const uint8_t*)src;
context->dest = (uint8_t *)(dest);
context->num_output_bytes = 0;
context->destsize = (int32_t)destsize;
context->sourcesize = sourcesize;
context->typesize = typesize;
context->compcode = compressor;
context->numthreads = numthreads;
context->end_threads = 0;
context->clevel = clevel;
envvar = getenv("BLOSC_WARN");
if (envvar != NULL) {
warnlvl = strtol(envvar, NULL, 10);
}
/* Check buffer size limits */
if (sourcesize > BLOSC_MAX_BUFFERSIZE) {
if (warnlvl > 0) {
fprintf(stderr, "Input buffer size cannot exceed %d bytes\n",
BLOSC_MAX_BUFFERSIZE);
}
return 0;
}
if (destsize < BLOSC_MAX_OVERHEAD) {
if (warnlvl > 0) {
fprintf(stderr, "Output buffer size should be larger than %d bytes\n",
BLOSC_MAX_OVERHEAD);
}
return 0;
}
/* Compression level */
if (clevel < 0 || clevel > 9) {
fprintf(stderr, "`clevel` parameter must be between 0 and 9!\n");
return -10;
}
/* Shuffle */
if (doshuffle != 0 && doshuffle != 1 && doshuffle != 2) {
fprintf(stderr, "`shuffle` parameter must be either 0, 1 or 2!\n");
return -10;
}
/* Check typesize limits */
if (context->typesize > BLOSC_MAX_TYPESIZE) {
/* If typesize is too large, treat buffer as an 1-byte stream. */
context->typesize = 1;
}
/* Get the blocksize */
context->blocksize = compute_blocksize(context, clevel, (int32_t)context->typesize, context->sourcesize, blocksize);
/* Compute number of blocks in buffer */
context->nblocks = context->sourcesize / context->blocksize;
context->leftover = context->sourcesize % context->blocksize;
context->nblocks = (context->leftover > 0) ? (context->nblocks + 1) : context->nblocks;
return 1;
}
static int write_compression_header(struct blosc_context* context, int clevel, int doshuffle)
{
int32_t compformat;
int dont_split;
/* Write version header for this block */
context->dest[0] = BLOSC_VERSION_FORMAT; /* blosc format version */
/* Write compressor format */
compformat = -1;
switch (context->compcode)
{
case BLOSC_BLOSCLZ:
compformat = BLOSC_BLOSCLZ_FORMAT;
context->dest[1] = BLOSC_BLOSCLZ_VERSION_FORMAT; /* blosclz format version */
break;
#if defined(HAVE_LZ4)
case BLOSC_LZ4:
compformat = BLOSC_LZ4_FORMAT;
context->dest[1] = BLOSC_LZ4_VERSION_FORMAT; /* lz4 format version */
break;
case BLOSC_LZ4HC:
compformat = BLOSC_LZ4HC_FORMAT;
context->dest[1] = BLOSC_LZ4HC_VERSION_FORMAT; /* lz4hc is the same as lz4 */
break;
#endif /* HAVE_LZ4 */
#if defined(HAVE_SNAPPY)
case BLOSC_SNAPPY:
compformat = BLOSC_SNAPPY_FORMAT;
context->dest[1] = BLOSC_SNAPPY_VERSION_FORMAT; /* snappy format version */
break;
#endif /* HAVE_SNAPPY */
#if defined(HAVE_ZLIB)
case BLOSC_ZLIB:
compformat = BLOSC_ZLIB_FORMAT;
context->dest[1] = BLOSC_ZLIB_VERSION_FORMAT; /* zlib format version */
break;
#endif /* HAVE_ZLIB */
#if defined(HAVE_ZSTD)
case BLOSC_ZSTD:
compformat = BLOSC_ZSTD_FORMAT;
context->dest[1] = BLOSC_ZSTD_VERSION_FORMAT; /* zstd format version */
break;
#endif /* HAVE_ZSTD */
default:
{
const char *compname;
compname = clibcode_to_clibname(compformat);
if (compname == NULL) {
compname = "(null)";
}
fprintf(stderr, "Blosc has not been compiled with '%s' ", compname);
fprintf(stderr, "compression support. Please use one having it.");
return -5; /* signals no compression support */
break;
}
}
context->header_flags = context->dest+2; /* flags */
context->dest[2] = 0; /* zeroes flags */
context->dest[3] = (uint8_t)context->typesize; /* type size */
_sw32(context->dest + 4, context->sourcesize); /* size of the buffer */
_sw32(context->dest + 8, context->blocksize); /* block size */
context->bstarts = context->dest + 16; /* starts for every block */
context->num_output_bytes = 16 + sizeof(int32_t)*context->nblocks; /* space for header and pointers */
if (context->clevel == 0) {
/* Compression level 0 means buffer to be memcpy'ed */
*(context->header_flags) |= BLOSC_MEMCPYED;
context->num_output_bytes = 16; /* space just for header */
}
if (context->sourcesize < MIN_BUFFERSIZE) {
/* Buffer is too small. Try memcpy'ing. */
*(context->header_flags) |= BLOSC_MEMCPYED;
context->num_output_bytes = 16; /* space just for header */
}
if (doshuffle == BLOSC_SHUFFLE) {
/* Byte-shuffle is active */
*(context->header_flags) |= BLOSC_DOSHUFFLE; /* bit 0 set to one in flags */
}
if (doshuffle == BLOSC_BITSHUFFLE) {
/* Bit-shuffle is active */
*(context->header_flags) |= BLOSC_DOBITSHUFFLE; /* bit 2 set to one in flags */
}
dont_split = !split_block(context->compcode, context->typesize,
context->blocksize);
*(context->header_flags) |= dont_split << 4; /* dont_split is in bit 4 */
*(context->header_flags) |= compformat << 5; /* compressor format starts at bit 5 */
return 1;
}
int blosc_compress_context(struct blosc_context* context)
{
int32_t ntbytes = 0;
if ((*(context->header_flags) & BLOSC_MEMCPYED) &&
(context->sourcesize + BLOSC_MAX_OVERHEAD > context->destsize)) {
return 0; /* data cannot be copied without overrun destination */
}
/* Do the actual compression */
ntbytes = do_job(context);
if (ntbytes < 0) {
return -1;
}
if ((ntbytes == 0) && (context->sourcesize + BLOSC_MAX_OVERHEAD <= context->destsize)) {
/* Last chance for fitting `src` buffer in `dest`. Update flags and force a copy. */
*(context->header_flags) |= BLOSC_MEMCPYED;
context->num_output_bytes = BLOSC_MAX_OVERHEAD; /* reset the output bytes in previous step */
ntbytes = do_job(context);
if (ntbytes < 0) {
return -1;
}
}
/* Set the number of compressed bytes in header */
_sw32(context->dest + 12, ntbytes);
assert(ntbytes <= context->destsize);
return ntbytes;
}
/* The public routine for compression with context. */
int blosc_compress_ctx(int clevel, int doshuffle, size_t typesize,
size_t nbytes, const void* src, void* dest,
size_t destsize, const char* compressor,
size_t blocksize, int numinternalthreads)
{
int error, result;
struct blosc_context context;
context.threads_started = 0;
error = initialize_context_compression(&context, clevel, doshuffle, typesize,
nbytes, src, dest, destsize,
blosc_compname_to_compcode(compressor),
blocksize, numinternalthreads);
if (error <= 0) { return error; }
error = write_compression_header(&context, clevel, doshuffle);
if (error <= 0) { return error; }
result = blosc_compress_context(&context);
if (numinternalthreads > 1)
{
blosc_release_threadpool(&context);
}
return result;
}
/* The public routine for compression. See blosc.h for docstrings. */
int blosc_compress(int clevel, int doshuffle, size_t typesize, size_t nbytes,
const void *src, void *dest, size_t destsize)
{
int result;
char* envvar;
/* Check if should initialize */
if (!g_initlib) blosc_init();
/* Check for environment variables */
envvar = getenv("BLOSC_CLEVEL");
if (envvar != NULL) {
long value;
value = strtol(envvar, NULL, 10);
if ((value != EINVAL) && (value >= 0)) {
clevel = (int)value;
}
}
envvar = getenv("BLOSC_SHUFFLE");
if (envvar != NULL) {
if (strcmp(envvar, "NOSHUFFLE") == 0) {
doshuffle = BLOSC_NOSHUFFLE;
}
if (strcmp(envvar, "SHUFFLE") == 0) {
doshuffle = BLOSC_SHUFFLE;
}
if (strcmp(envvar, "BITSHUFFLE") == 0) {
doshuffle = BLOSC_BITSHUFFLE;
}
}
envvar = getenv("BLOSC_TYPESIZE");
if (envvar != NULL) {
long value;
value = strtol(envvar, NULL, 10);
if ((value != EINVAL) && (value > 0)) {
typesize = (int)value;
}
}
envvar = getenv("BLOSC_COMPRESSOR");
if (envvar != NULL) {
result = blosc_set_compressor(envvar);
if (result < 0) { return result; }
}
envvar = getenv("BLOSC_BLOCKSIZE");
if (envvar != NULL) {
long blocksize;
blocksize = strtol(envvar, NULL, 10);
if ((blocksize != EINVAL) && (blocksize > 0)) {
blosc_set_blocksize((size_t)blocksize);
}
}
envvar = getenv("BLOSC_NTHREADS");
if (envvar != NULL) {
long nthreads;
nthreads = strtol(envvar, NULL, 10);
if ((nthreads != EINVAL) && (nthreads > 0)) {
result = blosc_set_nthreads((int)nthreads);
if (result < 0) { return result; }
}
}
envvar = getenv("BLOSC_SPLITMODE");
if (envvar != NULL) {
if (strcmp(envvar, "FORWARD_COMPAT") == 0) {
blosc_set_splitmode(BLOSC_FORWARD_COMPAT_SPLIT);
}
else if (strcmp(envvar, "AUTO") == 0) {
blosc_set_splitmode(BLOSC_AUTO_SPLIT);
}
else if (strcmp(envvar, "ALWAYS") == 0) {
blosc_set_splitmode(BLOSC_ALWAYS_SPLIT);
}
else if (strcmp(envvar, "NEVER") == 0) {
blosc_set_splitmode(BLOSC_NEVER_SPLIT);
}
else {
fprintf(stderr, "BLOSC_SPLITMODE environment variable '%s' not recognized\n", envvar);
return -1;
}
}
/* Check for a BLOSC_NOLOCK environment variable. It is important
that this should be the last env var so that it can take the
previous ones into account */
envvar = getenv("BLOSC_NOLOCK");
if (envvar != NULL) {
const char *compname;
blosc_compcode_to_compname(g_compressor, &compname);
result = blosc_compress_ctx(clevel, doshuffle, typesize,
nbytes, src, dest, destsize,
compname, g_force_blocksize, g_threads);
return result;
}
pthread_mutex_lock(global_comp_mutex);
do {
result = initialize_context_compression(g_global_context, clevel, doshuffle,
typesize, nbytes, src, dest, destsize,
g_compressor, g_force_blocksize,
g_threads);
if (result <= 0) { break; }
result = write_compression_header(g_global_context, clevel, doshuffle);
if (result <= 0) { break; }
result = blosc_compress_context(g_global_context);
} while (0);
pthread_mutex_unlock(global_comp_mutex);
return result;
}
static int blosc_run_decompression_with_context(struct blosc_context* context,
const void* src,
void* dest,
size_t destsize,
int numinternalthreads)
{
uint8_t version;
int32_t ntbytes;
context->compress = 0;
context->src = (const uint8_t*)src;
context->dest = (uint8_t*)dest;
context->destsize = destsize;
context->num_output_bytes = 0;
context->numthreads = numinternalthreads;
context->end_threads = 0;
/* Read the header block */
version = context->src[0]; /* blosc format version */
context->compversion = context->src[1];
context->header_flags = (uint8_t*)(context->src + 2); /* flags */
context->typesize = (int32_t)context->src[3]; /* typesize */
context->sourcesize = sw32_(context->src + 4); /* buffer size */
context->blocksize = sw32_(context->src + 8); /* block size */
context->compressedsize = sw32_(context->src + 12); /* compressed buffer size */
context->bstarts = (uint8_t*)(context->src + 16);
if (context->sourcesize == 0) {
/* Source buffer was empty, so we are done */
return 0;
}
if (context->blocksize <= 0 || context->blocksize > destsize ||
context->blocksize > BLOSC_MAX_BLOCKSIZE || context->typesize <= 0 ||
context->typesize > BLOSC_MAX_TYPESIZE) {
return -1;
}
if (version != BLOSC_VERSION_FORMAT) {
/* Version from future */
return -1;
}
if (*context->header_flags & 0x08) {
/* compressor flags from the future */
return -1;
}
/* Compute some params */
/* Total blocks */
context->nblocks = context->sourcesize / context->blocksize;
context->leftover = context->sourcesize % context->blocksize;
context->nblocks = (context->leftover>0)? context->nblocks+1: context->nblocks;
/* Check that we have enough space to decompress */
if (context->sourcesize > (int32_t)destsize) {
return -1;
}
if (*(context->header_flags) & BLOSC_MEMCPYED) {
/* Validate that compressed size is equal to decompressed size + header
size. */
if (context->sourcesize + BLOSC_MAX_OVERHEAD != context->compressedsize) {
return -1;
}
} else {
ntbytes = initialize_decompress_func(context);
if (ntbytes != 0) return ntbytes;
/* Validate that compressed size is large enough to hold the bstarts array */
if (context->nblocks > (context->compressedsize - 16) / 4) {
return -1;
}
}
/* Do the actual decompression */
ntbytes = do_job(context);
if (ntbytes < 0) {
return -1;
}
assert(ntbytes <= (int32_t)destsize);
return ntbytes;
}
int blosc_decompress_ctx(const void* src, void* dest, size_t destsize,
int numinternalthreads) {
int result;
struct blosc_context context;
context.threads_started = 0;
result = blosc_run_decompression_with_context(&context, src, dest, destsize,
numinternalthreads);
if (numinternalthreads > 1)
{
blosc_release_threadpool(&context);
}
return result;
}
int blosc_decompress(const void* src, void* dest, size_t destsize) {
int result;
char* envvar;
long nthreads;
/* Check if should initialize */
if (!g_initlib) blosc_init();
/* Check for a BLOSC_NTHREADS environment variable */
envvar = getenv("BLOSC_NTHREADS");
if (envvar != NULL) {
nthreads = strtol(envvar, NULL, 10);
if ((nthreads != EINVAL) && (nthreads > 0)) {
result = blosc_set_nthreads((int)nthreads);
if (result < 0) { return result; }
}
}
/* Check for a BLOSC_NOLOCK environment variable. It is important
that this should be the last env var so that it can take the
previous ones into account */
envvar = getenv("BLOSC_NOLOCK");
if (envvar != NULL) {
result = blosc_decompress_ctx(src, dest, destsize, g_threads);
return result;
}
pthread_mutex_lock(global_comp_mutex);
result = blosc_run_decompression_with_context(g_global_context, src, dest,
destsize, g_threads);
pthread_mutex_unlock(global_comp_mutex);
return result;
}
int blosc_getitem(const void* src, int start, int nitems, void* dest) {
uint8_t *_src=NULL; /* current pos for source buffer */
uint8_t version, compversion; /* versions for compressed header */
uint8_t flags; /* flags for header */
int32_t ntbytes = 0; /* the number of uncompressed bytes */
int32_t nblocks; /* number of total blocks in buffer */
int32_t leftover; /* extra bytes at end of buffer */
uint8_t *bstarts; /* start pointers for each block */
int32_t typesize, blocksize, nbytes, compressedsize;
int32_t j, bsize, bsize2, leftoverblock;
int32_t cbytes, startb, stopb;
int stop = start + nitems;
uint8_t *tmp;
uint8_t *tmp2;
uint8_t *tmp3;
int32_t ebsize;
struct blosc_context context = {0};
_src = (uint8_t *)(src);
/* Read the header block */
version = _src[0]; /* blosc format version */
compversion = _src[1];
flags = _src[2]; /* flags */
typesize = (int32_t)_src[3]; /* typesize */
nbytes = sw32_(_src + 4); /* buffer size */
blocksize = sw32_(_src + 8); /* block size */
compressedsize = sw32_(_src + 12); /* compressed buffer size */
if (version != BLOSC_VERSION_FORMAT)
return -9;
if (blocksize <= 0 || blocksize > nbytes || blocksize > BLOSC_MAX_BLOCKSIZE ||
typesize <= 0 || typesize > BLOSC_MAX_TYPESIZE) {
return -1;
}
/* Compute some params */
/* Total blocks */
nblocks = nbytes / blocksize;
leftover = nbytes % blocksize;
nblocks = (leftover>0)? nblocks+1: nblocks;
/* Only initialize the fields blosc_d uses */
context.typesize = typesize;
context.header_flags = &flags;
context.compversion = compversion;
context.compressedsize = compressedsize;
if (flags & BLOSC_MEMCPYED) {
if (nbytes + BLOSC_MAX_OVERHEAD != compressedsize) {
return -1;
}
} else {
ntbytes = initialize_decompress_func(&context);
if (ntbytes != 0) return ntbytes;
if (nblocks >= (compressedsize - 16) / 4) {
return -1;
}
}
ebsize = blocksize + typesize * (int32_t)sizeof(int32_t);
tmp = my_malloc(blocksize + ebsize + blocksize);
tmp2 = tmp + blocksize;
tmp3 = tmp + blocksize + ebsize;
_src += 16;
bstarts = _src;
_src += sizeof(int32_t)*nblocks;
/* Check region boundaries */
if ((start < 0) || (start*typesize > nbytes)) {
fprintf(stderr, "`start` out of bounds");
return -1;
}
if ((stop < 0) || (stop*typesize > nbytes)) {
fprintf(stderr, "`start`+`nitems` out of bounds");
return -1;
}
for (j = 0; j < nblocks; j++) {
bsize = blocksize;
leftoverblock = 0;
if ((j == nblocks - 1) && (leftover > 0)) {
bsize = leftover;
leftoverblock = 1;
}
/* Compute start & stop for each block */
startb = start * typesize - j * blocksize;
stopb = stop * typesize - j * blocksize;
if ((startb >= (int)blocksize) || (stopb <= 0)) {
continue;
}
if (startb < 0) {
startb = 0;
}
if (stopb > (int)blocksize) {
stopb = blocksize;
}
bsize2 = stopb - startb;
/* Do the actual data copy */
if (flags & BLOSC_MEMCPYED) {
/* We want to memcpy only */
fastcopy((uint8_t *) dest + ntbytes,
(uint8_t *) src + BLOSC_MAX_OVERHEAD + j * blocksize + startb, bsize2);
cbytes = bsize2;
}
else {
/* Regular decompression. Put results in tmp2. */
cbytes = blosc_d(&context, bsize, leftoverblock,
(uint8_t *)src, sw32_(bstarts + j * 4),
tmp2, tmp, tmp3);
if (cbytes < 0) {
ntbytes = cbytes;
break;
}
/* Copy to destination */
fastcopy((uint8_t *) dest + ntbytes, tmp2 + startb, bsize2);
cbytes = bsize2;
}
ntbytes += cbytes;
}
my_free(tmp);
return ntbytes;
}
/* Decompress & unshuffle several blocks in a single thread */
static void *t_blosc(void *ctxt)
{
struct thread_context* context = (struct thread_context*)ctxt;
int32_t cbytes, ntdest;
int32_t tblocks; /* number of blocks per thread */
int32_t leftover2;
int32_t tblock; /* limit block on a thread */
int32_t nblock_; /* private copy of nblock */
int32_t bsize, leftoverblock;
/* Parameters for threads */
int32_t blocksize;
int32_t ebsize;
int32_t compress;
int32_t maxbytes;
int32_t ntbytes;
int32_t flags;
int32_t nblocks;
int32_t leftover;
uint8_t *bstarts;
const uint8_t *src;
uint8_t *dest;
uint8_t *tmp;
uint8_t *tmp2;
uint8_t *tmp3;
int rc;
(void)rc; // just to avoid 'unused-variable' warning
while(1)
{
/* Synchronization point for all threads (wait for initialization) */
WAIT_INIT(NULL, context->parent_context);
if(context->parent_context->end_threads)
{
break;
}
/* Get parameters for this thread before entering the main loop */
blocksize = context->parent_context->blocksize;
ebsize = blocksize + context->parent_context->typesize * (int32_t)sizeof(int32_t);
compress = context->parent_context->compress;
flags = *(context->parent_context->header_flags);
maxbytes = context->parent_context->destsize;
nblocks = context->parent_context->nblocks;
leftover = context->parent_context->leftover;
bstarts = context->parent_context->bstarts;
src = context->parent_context->src;
dest = context->parent_context->dest;
if (blocksize > context->tmpblocksize)
{
my_free(context->tmp);
context->tmp = my_malloc(blocksize + ebsize + blocksize);
context->tmp2 = context->tmp + blocksize;
context->tmp3 = context->tmp + blocksize + ebsize;
}
tmp = context->tmp;
tmp2 = context->tmp2;
tmp3 = context->tmp3;
ntbytes = 0; /* only useful for decompression */
if (compress && !(flags & BLOSC_MEMCPYED)) {
/* Compression always has to follow the block order */
pthread_mutex_lock(&context->parent_context->count_mutex);
context->parent_context->thread_nblock++;
nblock_ = context->parent_context->thread_nblock;
pthread_mutex_unlock(&context->parent_context->count_mutex);
tblock = nblocks;
}
else {
/* Decompression can happen using any order. We choose
sequential block order on each thread */
/* Blocks per thread */
tblocks = nblocks / context->parent_context->numthreads;
leftover2 = nblocks % context->parent_context->numthreads;
tblocks = (leftover2>0)? tblocks+1: tblocks;
nblock_ = context->tid*tblocks;
tblock = nblock_ + tblocks;
if (tblock > nblocks) {
tblock = nblocks;
}
}
/* Loop over blocks */
leftoverblock = 0;
while ((nblock_ < tblock) && context->parent_context->thread_giveup_code > 0) {
bsize = blocksize;
if (nblock_ == (nblocks - 1) && (leftover > 0)) {
bsize = leftover;
leftoverblock = 1;
}
if (compress) {
if (flags & BLOSC_MEMCPYED) {
/* We want to memcpy only */
fastcopy(dest + BLOSC_MAX_OVERHEAD + nblock_ * blocksize,
src + nblock_ * blocksize, bsize);
cbytes = bsize;
}
else {
/* Regular compression */
cbytes = blosc_c(context->parent_context, bsize, leftoverblock, 0, ebsize,
src+nblock_*blocksize, tmp2, tmp, tmp3);
}
}
else {
if (flags & BLOSC_MEMCPYED) {
/* We want to memcpy only */
fastcopy(dest + nblock_ * blocksize,
src + BLOSC_MAX_OVERHEAD + nblock_ * blocksize, bsize);
cbytes = bsize;
}
else {
cbytes = blosc_d(context->parent_context, bsize, leftoverblock,
src, sw32_(bstarts + nblock_ * 4),
dest+nblock_*blocksize,
tmp, tmp2);
}
}
/* Check whether current thread has to giveup */
if (context->parent_context->thread_giveup_code <= 0) {
break;
}
/* Check results for the compressed/decompressed block */
if (cbytes < 0) { /* compr/decompr failure */
/* Set giveup_code error */
pthread_mutex_lock(&context->parent_context->count_mutex);
context->parent_context->thread_giveup_code = cbytes;
pthread_mutex_unlock(&context->parent_context->count_mutex);
break;
}
if (compress && !(flags & BLOSC_MEMCPYED)) {
/* Start critical section */
pthread_mutex_lock(&context->parent_context->count_mutex);
ntdest = context->parent_context->num_output_bytes;
_sw32(bstarts + nblock_ * 4, ntdest); /* update block start counter */
if ( (cbytes == 0) || (ntdest+cbytes > maxbytes) ) {
context->parent_context->thread_giveup_code = 0; /* uncompressible buffer */
pthread_mutex_unlock(&context->parent_context->count_mutex);
break;
}
context->parent_context->thread_nblock++;
nblock_ = context->parent_context->thread_nblock;
context->parent_context->num_output_bytes += cbytes; /* update return bytes counter */
pthread_mutex_unlock(&context->parent_context->count_mutex);
/* End of critical section */
/* Copy the compressed buffer to destination */
fastcopy(dest + ntdest, tmp2, cbytes);
}
else {
nblock_++;
/* Update counter for this thread */
ntbytes += cbytes;
}
} /* closes while (nblock_) */
/* Sum up all the bytes decompressed */
if ((!compress || (flags & BLOSC_MEMCPYED)) && context->parent_context->thread_giveup_code > 0) {
/* Update global counter for all threads (decompression only) */
pthread_mutex_lock(&context->parent_context->count_mutex);
context->parent_context->num_output_bytes += ntbytes;
pthread_mutex_unlock(&context->parent_context->count_mutex);
}
/* Meeting point for all threads (wait for finalization) */
WAIT_FINISH(NULL, context->parent_context);
}
/* Cleanup our working space and context */
my_free(context->tmp);
my_free(context);
return(NULL);
}
static int init_threads(struct blosc_context* context)
{
int32_t tid;
int rc2;
int32_t ebsize;
struct thread_context* thread_context;
/* Initialize mutex and condition variable objects */
pthread_mutex_init(&context->count_mutex, NULL);
/* Set context thread sentinels */
context->thread_giveup_code = 1;
context->thread_nblock = -1;
/* Barrier initialization */
#ifdef _POSIX_BARRIERS_MINE
pthread_barrier_init(&context->barr_init, NULL, context->numthreads+1);
pthread_barrier_init(&context->barr_finish, NULL, context->numthreads+1);
#else
pthread_mutex_init(&context->count_threads_mutex, NULL);
pthread_cond_init(&context->count_threads_cv, NULL);
context->count_threads = 0; /* Reset threads counter */
#endif
#if !defined(_WIN32)
/* Initialize and set thread detached attribute */
pthread_attr_init(&context->ct_attr);
pthread_attr_setdetachstate(&context->ct_attr, PTHREAD_CREATE_JOINABLE);
#endif
/* Finally, create the threads in detached state */
for (tid = 0; tid < context->numthreads; tid++) {
context->tids[tid] = tid;
/* Create a thread context thread owns context (will destroy when finished) */
thread_context = (struct thread_context*)my_malloc(sizeof(struct thread_context));
thread_context->parent_context = context;
thread_context->tid = tid;
ebsize = context->blocksize + context->typesize * (int32_t)sizeof(int32_t);
thread_context->tmp = my_malloc(context->blocksize + ebsize + context->blocksize);
thread_context->tmp2 = thread_context->tmp + context->blocksize;
thread_context->tmp3 = thread_context->tmp + context->blocksize + ebsize;
thread_context->tmpblocksize = context->blocksize;
#if !defined(_WIN32)
rc2 = pthread_create(&context->threads[tid], &context->ct_attr, t_blosc, (void *)thread_context);
#else
rc2 = pthread_create(&context->threads[tid], NULL, t_blosc, (void *)thread_context);
#endif
if (rc2) {
fprintf(stderr, "ERROR; return code from pthread_create() is %d\n", rc2);
fprintf(stderr, "\tError detail: %s\n", strerror(rc2));
return(-1);
}
}
return(0);
}
int blosc_get_nthreads(void)
{
int ret = g_threads;
return ret;
}
int blosc_set_nthreads(int nthreads_new)
{
int ret = g_threads;
/* Check if should initialize */
if (!g_initlib) blosc_init();
if (nthreads_new != ret){
/* Re-initialize Blosc */
blosc_destroy();
blosc_init();
g_threads = nthreads_new;
}
return ret;
}
int blosc_set_nthreads_(struct blosc_context* context)
{
if (context->numthreads > BLOSC_MAX_THREADS) {
fprintf(stderr,
"Error. nthreads cannot be larger than BLOSC_MAX_THREADS (%d)",
BLOSC_MAX_THREADS);
return -1;
}
else if (context->numthreads <= 0) {
fprintf(stderr, "Error. nthreads must be a positive integer");
return -1;
}
/* Launch a new pool of threads */
if (context->numthreads > 1 && context->numthreads != context->threads_started) {
blosc_release_threadpool(context);
if (init_threads(context) < 0) {
return -1;
}
}
/* We have now started the threads */
context->threads_started = context->numthreads;
return context->numthreads;
}
const char* blosc_get_compressor(void)
{
const char* compname;
blosc_compcode_to_compname(g_compressor, &compname);
return compname;
}
int blosc_set_compressor(const char *compname)
{
int code = blosc_compname_to_compcode(compname);
g_compressor = code;
/* Check if should initialize */
if (!g_initlib) blosc_init();
return code;
}
const char* blosc_list_compressors(void)
{
static int compressors_list_done = 0;
static char ret[256];
if (compressors_list_done) return ret;
ret[0] = '\0';
strcat(ret, BLOSC_BLOSCLZ_COMPNAME);
#if defined(HAVE_LZ4)
strcat(ret, ","); strcat(ret, BLOSC_LZ4_COMPNAME);
strcat(ret, ","); strcat(ret, BLOSC_LZ4HC_COMPNAME);
#endif /* HAVE_LZ4 */
#if defined(HAVE_SNAPPY)
strcat(ret, ","); strcat(ret, BLOSC_SNAPPY_COMPNAME);
#endif /* HAVE_SNAPPY */
#if defined(HAVE_ZLIB)
strcat(ret, ","); strcat(ret, BLOSC_ZLIB_COMPNAME);
#endif /* HAVE_ZLIB */
#if defined(HAVE_ZSTD)
strcat(ret, ","); strcat(ret, BLOSC_ZSTD_COMPNAME);
#endif /* HAVE_ZSTD */
compressors_list_done = 1;
return ret;
}
const char* blosc_get_version_string(void)
{
return BLOSC_VERSION_STRING;
}
int blosc_get_complib_info(const char *compname, char **complib, char **version)
{
int clibcode;
const char *clibname;
const char *clibversion = "unknown";
#if (defined(HAVE_LZ4) && defined(LZ4_VERSION_MAJOR)) || (defined(HAVE_SNAPPY) && defined(SNAPPY_VERSION)) || defined(ZSTD_VERSION_MAJOR)
char sbuffer[256];
#endif
clibcode = compname_to_clibcode(compname);
clibname = clibcode_to_clibname(clibcode);
/* complib version */
if (clibcode == BLOSC_BLOSCLZ_LIB) {
clibversion = BLOSCLZ_VERSION_STRING;
}
#if defined(HAVE_LZ4)
else if (clibcode == BLOSC_LZ4_LIB) {
#if defined(LZ4_VERSION_MAJOR)
sprintf(sbuffer, "%d.%d.%d",
LZ4_VERSION_MAJOR, LZ4_VERSION_MINOR, LZ4_VERSION_RELEASE);
clibversion = sbuffer;
#endif /* LZ4_VERSION_MAJOR */
}
#endif /* HAVE_LZ4 */
#if defined(HAVE_SNAPPY)
else if (clibcode == BLOSC_SNAPPY_LIB) {
#if defined(SNAPPY_VERSION)
sprintf(sbuffer, "%d.%d.%d", SNAPPY_MAJOR, SNAPPY_MINOR, SNAPPY_PATCHLEVEL);
clibversion = sbuffer;
#endif /* SNAPPY_VERSION */
}
#endif /* HAVE_SNAPPY */
#if defined(HAVE_ZLIB)
else if (clibcode == BLOSC_ZLIB_LIB) {
clibversion = ZLIB_VERSION;
}
#endif /* HAVE_ZLIB */
#if defined(HAVE_ZSTD)
else if (clibcode == BLOSC_ZSTD_LIB) {
sprintf(sbuffer, "%d.%d.%d",
ZSTD_VERSION_MAJOR, ZSTD_VERSION_MINOR, ZSTD_VERSION_RELEASE);
clibversion = sbuffer;
}
#endif /* HAVE_ZSTD */
else {
/* Unsupported library */
if (complib != NULL) *complib = NULL;
if (version != NULL) *version = NULL;
return -1;
}
if (complib != NULL) *complib = strdup(clibname);
if (version != NULL) *version = strdup(clibversion);
return clibcode;
}
/* Return `nbytes`, `cbytes` and `blocksize` from a compressed buffer. */
void blosc_cbuffer_sizes(const void *cbuffer, size_t *nbytes,
size_t *cbytes, size_t *blocksize)
{
uint8_t *_src = (uint8_t *)(cbuffer); /* current pos for source buffer */
uint8_t version = _src[0]; /* version of header */
if (version != BLOSC_VERSION_FORMAT) {
*nbytes = *blocksize = *cbytes = 0;
return;
}
/* Read the interesting values */
*nbytes = (size_t)sw32_(_src + 4); /* uncompressed buffer size */
*blocksize = (size_t)sw32_(_src + 8); /* block size */
*cbytes = (size_t)sw32_(_src + 12); /* compressed buffer size */
}
int blosc_cbuffer_validate(const void* cbuffer, size_t cbytes, size_t* nbytes) {
size_t header_cbytes, header_blocksize;
if (cbytes < BLOSC_MIN_HEADER_LENGTH) return -1;
blosc_cbuffer_sizes(cbuffer, nbytes, &header_cbytes, &header_blocksize);
if (header_cbytes != cbytes) return -1;
if (*nbytes > BLOSC_MAX_BUFFERSIZE) return -1;
return 0;
}
/* Return `typesize` and `flags` from a compressed buffer. */
void blosc_cbuffer_metainfo(const void *cbuffer, size_t *typesize,
int *flags)
{
uint8_t *_src = (uint8_t *)(cbuffer); /* current pos for source buffer */
uint8_t version = _src[0]; /* version of header */
if (version != BLOSC_VERSION_FORMAT) {
*flags = *typesize = 0;
return;
}
/* Read the interesting values */
*flags = (int)_src[2] & 7; /* first three flags */
*typesize = (size_t)_src[3]; /* typesize */
}
/* Return version information from a compressed buffer. */
void blosc_cbuffer_versions(const void *cbuffer, int *version,
int *versionlz)
{
uint8_t *_src = (uint8_t *)(cbuffer); /* current pos for source buffer */
/* Read the version info */
*version = (int)_src[0]; /* blosc format version */
*versionlz = (int)_src[1]; /* Lempel-Ziv compressor format version */
}
/* Return the compressor library/format used in a compressed buffer. */
const char *blosc_cbuffer_complib(const void *cbuffer)
{
uint8_t *_src = (uint8_t *)(cbuffer); /* current pos for source buffer */
int clibcode;
const char *complib;
/* Read the compressor format/library info */
clibcode = (_src[2] & 0xe0) >> 5;
complib = clibcode_to_clibname(clibcode);
return complib;
}
/* Get the internal blocksize to be used during compression. 0 means
that an automatic blocksize is computed internally. */
int blosc_get_blocksize(void)
{
return (int)g_force_blocksize;
}
/* Force the use of a specific blocksize. If 0, an automatic
blocksize will be used (the default). */
void blosc_set_blocksize(size_t size)
{
g_force_blocksize = (int32_t)size;
}
/* Force the use of a specific split mode. */
void blosc_set_splitmode(int mode)
{
g_splitmode = mode;
}
/* Child global context is invalid and pool threads no longer exist post-fork.
* Discard the old, inconsistent global context and global context mutex and
* mark as uninitialized. Subsequent calls through `blosc_*` interfaces will
* trigger re-init of the global context.
*
* All pthread interfaces have undefined behavior in child handler in current
* posix standards: http://pubs.opengroup.org/onlinepubs/9699919799/
*/
void blosc_atfork_child(void) {
if (!g_initlib) return;
g_initlib = 0;
my_free(global_comp_mutex);
global_comp_mutex = NULL;
my_free(g_global_context);
g_global_context = NULL;
}
void blosc_init(void)
{
/* Return if we are already initialized */
if (g_initlib) return;
global_comp_mutex = (pthread_mutex_t*)my_malloc(sizeof(pthread_mutex_t));
pthread_mutex_init(global_comp_mutex, NULL);
g_global_context = (struct blosc_context*)my_malloc(sizeof(struct blosc_context));
g_global_context->threads_started = 0;
#if !defined(_WIN32)
/* atfork handlers are only be registered once, though multiple re-inits may
* occur via blosc_destroy/blosc_init. */
if (!g_atfork_registered) {
g_atfork_registered = 1;
pthread_atfork(NULL, NULL, &blosc_atfork_child);
}
#endif
g_initlib = 1;
}
void blosc_destroy(void)
{
/* Return if Blosc is not initialized */
if (!g_initlib) return;
g_initlib = 0;
blosc_release_threadpool(g_global_context);
my_free(g_global_context);
g_global_context = NULL;
pthread_mutex_destroy(global_comp_mutex);
my_free(global_comp_mutex);
global_comp_mutex = NULL;
}
int blosc_release_threadpool(struct blosc_context* context)
{
int32_t t;
void* status;
int rc;
int rc2;
(void)rc; // just to avoid 'unused-variable' warning
if (context->threads_started > 0)
{
/* Tell all existing threads to finish */
context->end_threads = 1;
/* Sync threads */
WAIT_INIT(-1, context);
/* Join exiting threads */
for (t=0; t<context->threads_started; t++) {
rc2 = pthread_join(context->threads[t], &status);
if (rc2) {
fprintf(stderr, "ERROR; return code from pthread_join() is %d\n", rc2);
fprintf(stderr, "\tError detail: %s\n", strerror(rc2));
}
}
/* Release mutex and condition variable objects */
pthread_mutex_destroy(&context->count_mutex);
/* Barriers */
#ifdef _POSIX_BARRIERS_MINE
pthread_barrier_destroy(&context->barr_init);
pthread_barrier_destroy(&context->barr_finish);
#else
pthread_mutex_destroy(&context->count_threads_mutex);
pthread_cond_destroy(&context->count_threads_cv);
#endif
/* Thread attributes */
#if !defined(_WIN32)
pthread_attr_destroy(&context->ct_attr);
#endif
}
context->threads_started = 0;
return 0;
}
int blosc_free_resources(void)
{
/* Return if Blosc is not initialized */
if (!g_initlib) return -1;
return blosc_release_threadpool(g_global_context);
}