From 5e570e026aa283d4e8e6da61545cf31fbe21010f Mon Sep 17 00:00:00 2001 From: Benjamin Schwendinger Date: Sun, 15 Mar 2026 15:42:24 +0100 Subject: [PATCH 1/9] add zstd support --- R/fread.R | 22 ++++++- R/fwrite.R | 11 +++- configure | 28 ++++++++- inst/tests/tests.Rraw | 24 +++++++ src/Makevars.in | 4 +- src/data.table.h | 5 +- src/fwrite.c | 141 +++++++++++++++++++++++++++++++++--------- src/fwrite.h | 2 + src/fwriteR.c | 4 ++ src/init.c | 3 + src/utils.c | 90 +++++++++++++++++++++++++++ 11 files changed, 298 insertions(+), 36 deletions(-) diff --git a/R/fread.R b/R/fread.R index bc8509c713..8a1680f27a 100644 --- a/R/fread.R +++ b/R/fread.R @@ -135,6 +135,20 @@ yaml=FALSE, tmpdir=tempdir(), tz="UTC") }) file = decompFile # don't use 'tmpFile' symbol again, as tmpFile might be the http://domain.org/file.csv.gz download } + + zstdsig = FALSE + if ((endsWithAny(file, ".zst")) || (zstdsig <- is_zstd(file_signature))) { + if (!haszstd()) + stopf("To read .zst files, fread() requires zstd library support. data.table was compiled without zstd. Please reinstall data.table after installing the zstd development library (libzstd-dev on Debian/Ubuntu, libzstd-devel on Fedora/EPEL, zstd on Homebrew).") # nocov + decompFile = tempfile(tmpdir=tmpdir) + on.exit(unlink(decompFile), add=TRUE) + tryCatch({ + .Call(Cdt_zstd_decompress, file, decompFile) + }, error = function(e) { + stopf("zstd decompression of file '%s' failed:\n %s\nThis can happen when the disk is full in the temporary directory ('%s'). See ?fread for the tmpdir argument.", file, conditionMessage(e), tmpdir) + }) + file = decompFile + } file = enc2native(file) # CfreadR cannot handle UTF-8 if that is not the native encoding, see #3078. input = file @@ -396,7 +410,8 @@ yaml=FALSE, tmpdir=tempdir(), tz="UTC") known_signatures = list( zip = as.raw(c(0x50, 0x4b, 0x03, 0x04)), # charToRaw("PK\x03\x04") gzip = as.raw(c(0x1F, 0x8B)), - bzip = as.raw(c(0x42, 0x5A, 0x68)) + bzip = as.raw(c(0x42, 0x5A, 0x68)), + zstd = as.raw(c(0x28, 0xB5, 0x2F, 0xFD)) # zstd frame magic (little-endian 0xFD2FB528) ) # https://en.wikipedia.org/wiki/ZIP_(file_format)#File_headers @@ -417,6 +432,11 @@ is_bzip = function(file_signature) { isTRUE(file_signature[4L] %in% charToRaw('123456789')) # for #6304 } +# https://github.com/facebook/zstd/blob/dev/doc/zstd_compression_format.md#zstandard-frames +is_zstd = function(file_signature) { + identical(file_signature[1:4], known_signatures$zstd) +} + # simplified but faster version of `factor()` for internal use. as_factor = function(x) { lev = forderv(x, retGrp = TRUE, na.last = NA) diff --git a/R/fwrite.R b/R/fwrite.R index 5d91b4e347..e0781cb9e9 100644 --- a/R/fwrite.R +++ b/R/fwrite.R @@ -8,8 +8,9 @@ fwrite = function(x, file="", append=FALSE, quote="auto", dateTimeAs = c("ISO","squash","epoch","write.csv"), buffMB=8L, nThread=getDTthreads(verbose), showProgress=getOption("datatable.showProgress", interactive()), - compress = c("auto", "none", "gzip"), + compress = c("auto", "none", "gzip", "zstd"), compressLevel = 6L, + zstd_level = 3L, yaml = FALSE, bom = FALSE, verbose=getOption("datatable.verbose", FALSE), @@ -27,6 +28,7 @@ fwrite = function(x, file="", append=FALSE, quote="auto", buffMB = as.integer(buffMB) nThread = as.integer(nThread) compressLevel = as.integer(compressLevel) + zstd_level = as.integer(zstd_level) # write.csv default is 'double' so fwrite follows suit. write.table's default is 'escape' # validate arguments if (is.matrix(x)) { # coerce to data.table if input object is matrix @@ -48,8 +50,9 @@ fwrite = function(x, file="", append=FALSE, quote="auto", `dec and sep must be distinct whenever both might be needed` = (!NROW(x) || NCOL(x) <= 1L || dec != sep), # sep2!=dec and sep2!=sep checked at C level when we know if list columns are present is.character(eol) && length(eol)==1L, length(qmethod) == 1L && qmethod %chin% c("double", "escape"), - length(compress) == 1L && compress %chin% c("auto", "none", "gzip"), + length(compress) == 1L && compress %chin% c("auto", "none", "gzip", "zstd"), length(compressLevel) == 1L && 0L <= compressLevel && compressLevel <= 9L, + length(zstd_level) == 1L && 1L <= zstd_level && zstd_level <= 22L, isTRUEorFALSE(col.names), isTRUEorFALSE(append), isTRUEorFALSE(row.names), isTRUEorFALSE(verbose), isTRUEorFALSE(showProgress), isTRUEorFALSE(logical01), isTRUEorFALSE(bom), isTRUEorFALSE(forceDecimal), @@ -60,6 +63,7 @@ fwrite = function(x, file="", append=FALSE, quote="auto", ) is_gzip = compress == "gzip" || (compress == "auto" && endsWithAny(file, ".gz")) + is_zstd = compress == "zstd" || (compress == "auto" && endsWithAny(file, ".zst")) file = path.expand(file) # "~/foo/bar" if (append && (file=="" || file.exists(file))) { @@ -123,8 +127,9 @@ fwrite = function(x, file="", append=FALSE, quote="auto", } .Call(CfwriteR, x, file, sep, sep2, eol, na, dec, quote, qmethod=="escape", append, row.names, col.names, logical01, scipen, dateTimeAs, buffMB, nThread, - showProgress, is_gzip, compressLevel, bom, yaml, verbose, encoding, forceDecimal) + showProgress, is_gzip, compressLevel, is_zstd, zstd_level, bom, yaml, verbose, encoding, forceDecimal) invisible() } haszlib = function() .Call(Cdt_has_zlib) +haszstd = function() .Call(Cdt_has_zstd) diff --git a/configure b/configure index 44c1df2c45..9132a6dc9e 100755 --- a/configure +++ b/configure @@ -175,7 +175,7 @@ sed -e "s|@PKG_CFLAGS@|$PKG_CFLAGS|" -e "s|@PKG_LIBS@|$PKG_LIBS|" src/Makevars.i # optional dependency on zlib if [ "$NOZLIB" = "1" ]; then - echo "*** Compilation without compression support in fwrite" + echo "*** Compilation without gzip compression support in fwrite" sed -e "s|@zlib_cflags@|-DNOZLIB|" src/Makevars > src/Makevars.tmp && mv src/Makevars.tmp src/Makevars sed -e "s|@zlib_libs@||" src/Makevars > src/Makevars.tmp && mv src/Makevars.tmp src/Makevars else @@ -183,4 +183,30 @@ else sed -e "s|@zlib_libs@|${lib}|" src/Makevars > src/Makevars.tmp && mv src/Makevars.tmp src/Makevars fi +# optional dependency on zstd +NOZSTD=1 +pkg-config --exists libzstd +if [ $? -ne 0 ]; then + echo "*** pkg-config cannot find libzstd; zstd compression will be disabled." + echo "*** To enable zstd support, install the zstd development library:" + echo "*** deb: libzstd-dev (Debian, Ubuntu, ...)" + echo "*** rpm: libzstd-devel (Fedora, EPEL, ...)" + echo "*** brew: zstd (macOS)" +else + NOZSTD=0 + zstd_lib=`pkg-config --libs libzstd` + zstd_cflag=`pkg-config --cflags libzstd` + zstd_version=`pkg-config --modversion libzstd` + echo "zstd ${zstd_version} is available ok" +fi + +if [ "$NOZSTD" = "1" ]; then + echo "*** Compilation without zstd compression support in fwrite/fread" + sed -e "s|@zstd_cflags@|-DNOZSTD|" src/Makevars > src/Makevars.tmp && mv src/Makevars.tmp src/Makevars + sed -e "s|@zstd_libs@||" src/Makevars > src/Makevars.tmp && mv src/Makevars.tmp src/Makevars +else + sed -e "s|@zstd_cflags@|${zstd_cflag}|" src/Makevars > src/Makevars.tmp && mv src/Makevars.tmp src/Makevars + sed -e "s|@zstd_libs@|${zstd_lib}|" src/Makevars > src/Makevars.tmp && mv src/Makevars.tmp src/Makevars +fi + exit 0 diff --git a/inst/tests/tests.Rraw b/inst/tests/tests.Rraw index 727df891cf..f86363e2cf 100644 --- a/inst/tests/tests.Rraw +++ b/inst/tests/tests.Rraw @@ -85,6 +85,7 @@ if (exists("test.data.table", .GlobalEnv, inherits=FALSE)) { which.last = data.table:::which.last `-.IDate` = data.table:::`-.IDate` haszlib = data.table:::haszlib + haszstd = data.table:::haszstd # Also, for functions that are masked by other packages, we need to map the data.table one. Or else, # the other package's function would be picked up. As above, we only need to do this because we desire @@ -9763,6 +9764,29 @@ if (haszlib()) { unlink(f) } +# fwrite/fread zstd compress # mirror tests from above +if (!haszstd()) { + test(1658.601, fwrite(data.table(a=1), file=tempfile(), compress="zstd"), error="header files were not found at the time data.table was compiled") +} else { + test(1658.61, fwrite(data.table(a=c(1:3), b=c(1:3)), compress="zstd"), output='a,b\n1,1\n2,2\n3,3') # compress ignored on console + DT = data.table(a=rep(1:2,each=100), b=rep(1:4,each=25)) + test(1658.621, fwrite(DT, file=f1<-tempfile(fileext=".zst"), verbose=TRUE), NULL, + output="args.nrow=200 args.ncol=2.*maxLineLen=5[12].*Writing 200 rows in 1 batches of 200 rows.*nth=1") + test(1658.622, fwrite(DT, file=f2<-tempfile()), NULL) + test(1658.623, file.info(f1)$size < file.info(f2)$size) + test(1658.63, fread(f1), DT) + fwrite(DT, file=f3<-tempfile(), compress="zstd") + fwrite(DT, file=f4<-tempfile(), compress="zstd", zstd_level=1) + fwrite(DT, file=f5<-tempfile(), compress="zstd", zstd_level=22) + fwrite(DT, file=f6<-tempfile(), compress="zstd", col.names=FALSE) + test(1658.641, file.info(f3)$size, file.info(f1)$size) + test(1658.642, file.info(f4)$size >= file.info(f1)$size) + test(1658.643, file.info(f1)$size >= file.info(f5)$size) + test(1658.644, fread(f6, col.names=c("a","b")), DT) + test(1658.645, fread(f3), DT) + unlink(c(f1,f2,f3,f4,f5,f6)) +} + # complex column support for fwrite, part of #3690 DT = data.table(a=1:3, z=0:2 - (2:0)*1i) test(1658.55, fwrite(DT), output='a,z\n1,0-2i\n2,1-1i\n3,2+0i') diff --git a/src/Makevars.in b/src/Makevars.in index 500427c226..eef6b2f98e 100644 --- a/src/Makevars.in +++ b/src/Makevars.in @@ -1,5 +1,5 @@ -PKG_CFLAGS = $(C_VISIBILITY) @PKG_CFLAGS@ @zlib_cflags@ -PKG_LIBS = $(C_VISIBILITY) @PKG_LIBS@ @zlib_libs@ +PKG_CFLAGS = $(C_VISIBILITY) @PKG_CFLAGS@ @zlib_cflags@ @zstd_cflags@ +PKG_LIBS = $(C_VISIBILITY) @PKG_LIBS@ @zlib_libs@ @zstd_libs@ # See WRE $1.2.1.1. But retain user supplied PKG_* too, #4664. # WRE states ($1.6) that += isn't portable and that we aren't allowed to use it. # Otherwise we could use the much simpler PKG_LIBS += @openmp_cflags@ -lz. diff --git a/src/data.table.h b/src/data.table.h index e7ccc55d38..d2d4d4873a 100644 --- a/src/data.table.h +++ b/src/data.table.h @@ -413,7 +413,7 @@ SEXP chmatch_R(SEXP, SEXP, SEXP); SEXP chmatchdup_R(SEXP, SEXP, SEXP); SEXP chin_R(SEXP, SEXP); SEXP freadR(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP); -SEXP fwriteR(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP); +SEXP fwriteR(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP); SEXP rbindlist(SEXP, SEXP, SEXP, SEXP, SEXP); SEXP setlistelt(SEXP, SEXP, SEXP); SEXP setS4elt(SEXP, SEXP, SEXP); @@ -468,6 +468,9 @@ SEXP allNAR(SEXP); SEXP test_dt_win_snprintf(void); SEXP dt_zlib_version(void); SEXP dt_has_zlib(void); +SEXP dt_zstd_version(void); +SEXP dt_has_zstd(void); +SEXP dt_zstd_decompress(SEXP, SEXP); SEXP startsWithAny(SEXP, SEXP, SEXP); SEXP convertDate(SEXP, SEXP); SEXP fastmean(SEXP); diff --git a/src/fwrite.c b/src/fwrite.c index 6709364230..e0ef276651 100644 --- a/src/fwrite.c +++ b/src/fwrite.c @@ -10,6 +10,10 @@ #ifndef NOZLIB #include // for compression to .gz #endif +#ifndef NOZSTD +#include // for compression to .zst +#include // for ZSTD_ErrorCode, ZSTD_getErrorString +#endif #ifdef WIN32 #include @@ -42,6 +46,7 @@ static int scipen; static bool squashDateTime=false; // 0=ISO(yyyy-mm-dd) 1=squash(yyyymmdd) static bool verbose=false; static int gzip_level; +static int zstd_level; static bool forceDecimal=false; // force writing decimal points for numeric columns extern const char *getString(const void *, int64_t); @@ -630,6 +635,7 @@ void fwriteMain(fwriteMainArgs args) int8_t quoteHeaders = args.doQuote; verbose = args.verbose; gzip_level = args.gzip_level; + zstd_level = args.zstd_level; forceDecimal = args.forceDecimal; size_t len; @@ -640,6 +646,10 @@ void fwriteMain(fwriteMainArgs args) if (args.is_gzip) STOP(_("Compression in fwrite uses zlib library. Its header files were not found at the time data.table was compiled. To enable fwrite compression, please reinstall data.table and study the output for further guidance.")); // # nocov #endif +#ifdef NOZSTD + if (args.is_zstd) + STOP(_("Compression in fwrite uses zstd library. Its header files were not found at the time data.table was compiled. To enable zstd compression, please reinstall data.table and study the output for further guidance.")); // # nocov +#endif // When NA is a non-empty string, then we must quote all string fields in case they contain the na string // na is recommended to be empty, though @@ -721,6 +731,7 @@ void fwriteMain(fwriteMainArgs args) if (*args.filename == '\0') { f = -1; // file="" means write to standard output args.is_gzip = false; // gzip is only for file + args.is_zstd = false; // zstd is only for file } else { #ifdef WIN32 f = _open(args.filename, _O_WRONLY | _O_BINARY | _O_CREAT | (args.append ? _O_APPEND : _O_TRUNC), _S_IWRITE); @@ -811,13 +822,14 @@ void fwriteMain(fwriteMainArgs args) buffSize / MEGA, nth, errno, strerror(errno)); // # nocov } - // init compress variables -#ifndef NOZLIB - z_stream strm; - // NB: fine to free() this even if unallocated + // init compress variables (shared across gzip and zstd) + // NB: fine to free() this even if unallocated (free(NULL) is safe) char *zbuffPool = NULL; size_t zbuffSize = 0; size_t compress_len = 0; + +#ifndef NOZLIB + z_stream strm; if (args.is_gzip) { // compute zbuffSize which is the same for each thread if (init_stream(&strm) != Z_OK) { @@ -829,25 +841,40 @@ void fwriteMain(fwriteMainArgs args) zbuffSize = deflateBound(&strm, buffSize); if (verbose) DTPRINT(_("zbuffSize=%d returned from deflateBound\n"), (int)zbuffSize); + } +#endif // #NOZLIB - // alloc nth zlib buffers +#ifndef NOZSTD + if (args.is_zstd) { + zbuffSize = ZSTD_compressBound(buffSize); + if (verbose) + DTPRINT(_("zbuffSize=%d returned from ZSTD_compressBound\n"), (int)zbuffSize); + } +#endif // #NOZSTD + + if (zbuffSize) { + // alloc nth compressed-output buffers (one per thread) // if headerLen > nth * zbuffSize (long variable names and 1 thread), alloc headerLen alloc_size = nth * zbuffSize < headerLen ? headerLen : nth * zbuffSize; - if (verbose) { + if (verbose) DTPRINT(_("Allocate %zu bytes (%zu MiB) for zbuffPool\n"), alloc_size, alloc_size / MEGA); - } zbuffPool = malloc(alloc_size); if (!zbuffPool) { // # nocov start free(buffPool); - deflateEnd(&strm); +#ifndef NOZLIB + if (args.is_gzip) deflateEnd(&strm); +#endif STOP(_("Unable to allocate %zu MiB * %d thread compressed buffers; '%d: %s'. Please read ?fwrite for nThread, buffMB and verbose options."), zbuffSize / MEGA, nth, errno, strerror(errno)); // # nocov end } + } + +#ifndef NOZLIB + if (args.is_gzip) { len = 0; crc = crc32(0L, Z_NULL, 0); - if (f != -1) { // write minimal gzip header, but not on the console static const char header[] = "\037\213\10\0\0\0\0\0\0\3"; @@ -868,6 +895,11 @@ void fwriteMain(fwriteMainArgs args) } #endif // #NOZLIB +#ifndef NOZSTD + if (args.is_zstd) + len = 0; +#endif + // write header // use buffPool and zbuffPool because we ensure that allocation is minimum headerLen @@ -912,7 +944,6 @@ void fwriteMain(fwriteMainArgs args) #ifndef NOZLIB if (args.is_gzip) { char* zbuff = zbuffPool; - size_t zbuffUsed = zbuffSize; len = (size_t)(ch - buff); crc = crc32(crc, (unsigned char*)buff, len); @@ -922,21 +953,35 @@ void fwriteMain(fwriteMainArgs args) ret2 = WRITE(f, zbuff, (int)zbuffUsed); compress_len += zbuffUsed; } - } else { + } else #endif - ret2 = WRITE(f, buff, (int)(ch - buff)); -#ifndef NOZLIB - } +#ifndef NOZSTD + if (args.is_zstd) { + char* zbuff = zbuffPool; + size_t mylen_hdr = (size_t)(ch - buff); + size_t zbuffUsed = ZSTD_compress(zbuff, zbuffSize, buff, mylen_hdr, zstd_level); + if (ZSTD_isError(zbuffUsed)) { + ret1 = (int)ZSTD_getErrorCode(zbuffUsed); // # nocov + } else { + len += mylen_hdr; + ret2 = WRITE(f, zbuff, (int)zbuffUsed); + compress_len += zbuffUsed; + } + } else #endif + { + ret2 = WRITE(f, buff, (int)(ch - buff)); + } if (ret1 || ret2 == -1) { // # nocov start int errwrite = errno; // capture write errno now in case close fails with a different errno CLOSE(f); free(buffPool); -#ifndef NOZLIB free(zbuffPool); +#ifndef NOZLIB + if (args.is_gzip) deflateEnd(&strm); #endif - if (ret1) STOP(_("Failed to compress gzip. compressbuff() returned %d"), ret1); + if (ret1) STOP(_("Failed to compress header. Error code: %d"), ret1); else STOP(_("%s: '%s'"), strerror(errwrite), args.filename); // # nocov end } @@ -956,9 +1001,7 @@ void fwriteMain(fwriteMainArgs args) if (verbose) DTPRINT(_("No data rows present (nrow==0)\n")); free(buffPool); -#ifndef NOZLIB - free(zbuffPool); -#endif + free(zbuffPool); // free(NULL) is safe if (f != -1 && CLOSE(f)) STOP(_("%s: '%s'"), strerror(errno), args.filename); // # nocov return; @@ -983,12 +1026,13 @@ void fwriteMain(fwriteMainArgs args) char* myBuff = buffPool + me * buffSize; char* ch = myBuff; + // shared compression output pointer (used by both gzip and zstd) + void *myzBuff = NULL; + size_t myzbuffUsed = 0; #ifndef NOZLIB z_stream mystream; size_t mylen = 0; int mycrc = 0; - void *myzBuff = NULL; - size_t myzbuffUsed = 0; if (args.is_gzip) { myzBuff = zbuffPool + me * zbuffSize; if (init_stream(&mystream) != Z_OK) { // this should be thread safe according to zlib documentation @@ -996,6 +1040,12 @@ void fwriteMain(fwriteMainArgs args) my_failed_compress = -998; // # nocov } } +#endif +#ifndef NOZSTD + size_t myzstd_uncompressed = 0; + if (args.is_zstd) { + myzBuff = zbuffPool + me * zbuffSize; + } #endif if (failed) continue; // Not break. Because we don't use #omp cancel yet. @@ -1047,6 +1097,20 @@ void fwriteMain(fwriteMainArgs args) } } #endif +#ifndef NOZSTD + if (args.is_zstd && !failed) { + myzstd_uncompressed = (size_t)(ch - myBuff); + size_t zstd_result = ZSTD_compress(myzBuff, zbuffSize, myBuff, myzstd_uncompressed, zstd_level); + if (ZSTD_isError(zstd_result)) { + failed = true; + my_failed_compress = (int)ZSTD_getErrorCode(zstd_result); + myzbuffUsed = 0; + myzstd_uncompressed = 0; + } else { + myzbuffUsed = zstd_result; + } + } +#endif // ordered region ---- #pragma omp ordered @@ -1067,6 +1131,12 @@ void fwriteMain(fwriteMainArgs args) ret = WRITE(f, myzBuff, (int)myzbuffUsed); compress_len += myzbuffUsed; } else +#endif +#ifndef NOZSTD + if (args.is_zstd) { + ret = WRITE(f, myzBuff, (int)myzbuffUsed); + compress_len += myzbuffUsed; + } else #endif { ret = WRITE(f, myBuff, (int)(ch - myBuff)); @@ -1082,6 +1152,11 @@ void fwriteMain(fwriteMainArgs args) len += mylen; } #endif +#ifndef NOZSTD + if (args.is_zstd) { + len += myzstd_uncompressed; + } +#endif int used = 100 * ((double)(ch - myBuff)) / buffSize; // percentage of original buffMB if (used > maxBuffUsedPC) @@ -1112,10 +1187,9 @@ void fwriteMain(fwriteMainArgs args) } // end of parallel for loop free(buffPool); + free(zbuffPool); // free(NULL) is safe #ifndef NOZLIB - free(zbuffPool); - /* put a 4-byte integer into a byte array in LSB order */ #define PUT4(a,b) ((a)[0] = (b), (a)[1] = (b) >> 8, (a)[2] = (b) >> 16, (a)[3] = (b) >> 24) @@ -1132,6 +1206,7 @@ void fwriteMain(fwriteMainArgs args) STOP(_("Failed to write gzip trailer")); // # nocov } #endif + // no trailer needed for zstd: each chunk is a complete self-contained frame // Finished parallel region and can call R API safely now. if (hasPrinted) { @@ -1146,12 +1221,15 @@ void fwriteMain(fwriteMainArgs args) } if (verbose) { + if (args.is_gzip || args.is_zstd) { + DTPRINT(_("%s: uncompressed length=%zu (%zu MiB), compressed length=%zu (%zu MiB), ratio=%.1f%%"), + args.is_gzip ? "zlib" : "zstd", // # notranslate + len, len / MEGA, compress_len, compress_len / MEGA, len != 0 ? (100.0 * compress_len) / len : 0); #ifndef NOZLIB - if (args.is_gzip) { - DTPRINT(_("zlib: uncompressed length=%zu (%zu MiB), compressed length=%zu (%zu MiB), ratio=%.1f%%, crc=%x\n"), - len, len / MEGA, compress_len, compress_len / MEGA, len != 0 ? (100.0 * compress_len) / len : 0, crc); - } + if (args.is_gzip) DTPRINT(_(", crc=%x"), crc); #endif + DTPRINT("\n"); // # notranslate + } DTPRINT(Pl_(nth, Pl_(args.nrow, "Wrote %"PRId64" row in %.3f secs using %d thread. MaxBuffUsed=%d%%\n", "Wrote %"PRId64" rows in %.3f secs using %d thread. MaxBuffUsed=%d%%\n"), Pl_(args.nrow, "Wrote %"PRId64" row in %.3f secs using %d threads. MaxBuffUsed=%d%%\n", @@ -1168,11 +1246,18 @@ void fwriteMain(fwriteMainArgs args) if (failed) { // # nocov start #ifndef NOZLIB - if (failed_compress) + if (failed_compress && args.is_gzip) STOP(_("zlib %s (zlib.h %s) deflate() returned error %d Z_FINISH=%d Z_BLOCK=%d. %s"), zlibVersion(), ZLIB_VERSION, failed_compress, Z_FINISH, Z_BLOCK, verbose ? _("Please include the full output above and below this message in your data.table bug report.") : _("Please retry fwrite() with verbose=TRUE and include the full output with your data.table bug report.")); +#endif +#ifndef NOZSTD + if (failed_compress && args.is_zstd) + STOP(_("zstd %s compress failed (error code %d): %s. %s"), + ZSTD_versionString(), failed_compress, ZSTD_getErrorString((ZSTD_ErrorCode)failed_compress), + verbose ? _("Please include the full output above and below this message in your data.table bug report.") + : _("Please retry fwrite() with verbose=TRUE and include the full output with your data.table bug report.")); #endif if (failed_write) STOP("%s: '%s'", strerror(failed_write), args.filename); // # notranslate diff --git a/src/fwrite.h b/src/fwrite.h index 01b795b6fa..2fedfcdf30 100644 --- a/src/fwrite.h +++ b/src/fwrite.h @@ -110,6 +110,8 @@ typedef struct fwriteMainArgs bool showProgress; bool is_gzip; int gzip_level; + bool is_zstd; + int zstd_level; bool bom; const char *yaml; bool verbose; diff --git a/src/fwriteR.c b/src/fwriteR.c index b2d631fb3e..c79da95024 100644 --- a/src/fwriteR.c +++ b/src/fwriteR.c @@ -169,6 +169,8 @@ SEXP fwriteR( SEXP showProgress_Arg, SEXP is_gzip_Arg, SEXP gzip_level_Arg, + SEXP is_zstd_Arg, + SEXP zstd_level_Arg, SEXP bom_Arg, SEXP yaml_Arg, SEXP verbose_Arg, @@ -181,6 +183,8 @@ SEXP fwriteR( fwriteMainArgs args = { 0 }; // { 0 } to quieten valgrind's uninitialized, #4639 args.is_gzip = LOGICAL(is_gzip_Arg)[0]; args.gzip_level = INTEGER(gzip_level_Arg)[0]; + args.is_zstd = LOGICAL(is_zstd_Arg)[0]; + args.zstd_level = INTEGER(zstd_level_Arg)[0]; args.bom = LOGICAL(bom_Arg)[0]; args.yaml = CHAR(STRING_ELT(yaml_Arg, 0)); args.verbose = LOGICAL(verbose_Arg)[0]; diff --git a/src/init.c b/src/init.c index 13421998b4..bcb7b78a04 100644 --- a/src/init.c +++ b/src/init.c @@ -144,6 +144,9 @@ static const R_CallMethodDef callMethods[] = { {"Ctest_dt_win_snprintf", (DL_FUNC)&test_dt_win_snprintf, -1}, {"Cdt_zlib_version", (DL_FUNC)&dt_zlib_version, -1}, {"Cdt_has_zlib", (DL_FUNC)&dt_has_zlib, -1}, + {"Cdt_zstd_version", (DL_FUNC)&dt_zstd_version, -1}, + {"Cdt_has_zstd", (DL_FUNC)&dt_has_zstd, -1}, + {"Cdt_zstd_decompress", (DL_FUNC)&dt_zstd_decompress, 2}, {"Csubstitute_call_arg_namesR", (DL_FUNC)&substitute_call_arg_namesR, -1}, {"CstartsWithAny", (DL_FUNC)&startsWithAny, -1}, {"CconvertDate", (DL_FUNC)&convertDate, -1}, diff --git a/src/utils.c b/src/utils.c index d92dd20f58..3de289e73c 100644 --- a/src/utils.c +++ b/src/utils.c @@ -431,6 +431,96 @@ SEXP dt_has_zlib(void) { #endif } +#ifndef NOZSTD +#include +#include +#endif +SEXP dt_zstd_version(void) { + char out[70]; +#ifndef NOZSTD + snprintf(out, sizeof(out), "ZSTD_VERSION_STRING==%s runtime==%s", ZSTD_VERSION_STRING, ZSTD_versionString()); // # notranslate +#else + snprintf(out, sizeof(out), _("zstd header files were not found when data.table was compiled")); +#endif + return ScalarString(mkChar(out)); +} +SEXP dt_has_zstd(void) { +#ifndef NOZSTD + return ScalarLogical(1); +#else + return ScalarLogical(0); +#endif +} +SEXP dt_zstd_decompress(SEXP infile_sexp, SEXP outfile_sexp) { +#ifndef NOZSTD + const char *infile = CHAR(STRING_ELT(infile_sexp, 0)); + const char *outfile = CHAR(STRING_ELT(outfile_sexp, 0)); + + FILE *fin = fopen(infile, "rb"); + if (!fin) error(_("Cannot open input file for zstd decompression: '%s'"), infile); + + FILE *fout = fopen(outfile, "wb"); + if (!fout) { + fclose(fin); + error(_("Cannot open output file for zstd decompression: '%s'"), outfile); + } + + size_t const buffInSize = ZSTD_DStreamInSize(); + void *buffIn = malloc(buffInSize); + size_t const buffOutSize = ZSTD_DStreamOutSize(); + void *buffOut = malloc(buffOutSize); + + if (!buffIn || !buffOut) { + // # nocov start + free(buffIn); free(buffOut); + fclose(fin); fclose(fout); + error(_("Failed to allocate buffers for zstd decompression")); + // # nocov end + } + + ZSTD_DStream *dstream = ZSTD_createDStream(); + if (!dstream) { + // # nocov start + free(buffIn); free(buffOut); + fclose(fin); fclose(fout); + error(_("Failed to create zstd decompression stream")); + // # nocov end + } + ZSTD_initDStream(dstream); + + size_t read; + while ((read = fread(buffIn, 1, buffInSize, fin)) > 0) { + ZSTD_inBuffer input = { buffIn, read, 0 }; + while (input.pos < input.size) { + ZSTD_outBuffer output = { buffOut, buffOutSize, 0 }; + size_t ret = ZSTD_decompressStream(dstream, &output, &input); + if (ZSTD_isError(ret)) { + ZSTD_freeDStream(dstream); + free(buffIn); free(buffOut); + fclose(fin); fclose(fout); + error(_("zstd decompression error: %s"), ZSTD_getErrorName(ret)); + } + if (fwrite(buffOut, 1, output.pos, fout) != output.pos) { + // # nocov start + ZSTD_freeDStream(dstream); + free(buffIn); free(buffOut); + fclose(fin); fclose(fout); + error(_("Failed to write decompressed data to '%s'"), outfile); + // # nocov end + } + } + } + + ZSTD_freeDStream(dstream); + free(buffIn); free(buffOut); + fclose(fin); fclose(fout); + return R_NilValue; +#else + error(_("zstd header files were not found when data.table was compiled")); // # nocov + return R_NilValue; // # nocov +#endif +} + SEXP startsWithAny(const SEXP x, const SEXP y, SEXP start) { // for is_url in fread.R added in #5097 // basically any(startsWith()), short and simple ascii-only From d627e3bfc5739db991e8a2802acb45851c02a17e Mon Sep 17 00:00:00 2001 From: Benjamin Schwendinger Date: Mon, 16 Mar 2026 14:30:36 +0100 Subject: [PATCH 2/9] unify zstd level detection --- R/fread.R | 50 ++++++++++++++++++++++++-------------------------- 1 file changed, 24 insertions(+), 26 deletions(-) diff --git a/R/fread.R b/R/fread.R index 8a1680f27a..53d1a5f8df 100644 --- a/R/fread.R +++ b/R/fread.R @@ -119,35 +119,33 @@ yaml=FALSE, tmpdir=tempdir(), tz="UTC") } gzsig = FALSE - if ((w <- endsWithAny(file, c(".gz", ".bgz",".bz2"))) || (gzsig <- is_gzip(file_signature)) || is_bzip(file_signature)) { - if (!requireNamespace("R.utils", quietly = TRUE)) - stopf("To read %s files directly, fread() requires 'R.utils' package which cannot be found. Please install 'R.utils' using 'install.packages('R.utils')'.", if (w<=2L || gzsig) "gz" else "bz2") # nocov - # not worth doing a behavior test here, so just use getRversion(). - if (packageVersion("R.utils") < "2.13.0" && base::getRversion() >= "4.5.0") - stopf("Reading compressed files in fread requires R.utils version 2.13.0 or higher. Please upgrade R.utils.") # nocov - FUN = if (w<=2L || gzsig) gzfile else bzfile - decompFile = tempfile(tmpdir=tmpdir) - on.exit(unlink(decompFile), add=TRUE) - tryCatch({ - R.utils::decompressFile(file, decompFile, ext=NULL, FUN=FUN, remove=FALSE) # ext is not used by decompressFile when destname is supplied, but isn't optional - }, error = function(e) { - stopf("R.utils::decompressFile failed to decompress file '%s':\n %s\n. This can happen when the disk is full in the temporary directory ('%s'). See ?fread for the tmpdir argument.", file, conditionMessage(e), tmpdir) - }) - file = decompFile # don't use 'tmpFile' symbol again, as tmpFile might be the http://domain.org/file.csv.gz download - } - zstdsig = FALSE - if ((endsWithAny(file, ".zst")) || (zstdsig <- is_zstd(file_signature))) { - if (!haszstd()) - stopf("To read .zst files, fread() requires zstd library support. data.table was compiled without zstd. Please reinstall data.table after installing the zstd development library (libzstd-dev on Debian/Ubuntu, libzstd-devel on Fedora/EPEL, zstd on Homebrew).") # nocov + if ((w <- endsWithAny(file, c(".gz", ".bgz",".bz2", ".zst"))) || (gzsig <- is_gzip(file_signature)) || is_bzip(file_signature) || + (zstdsig <- is_zstd(file_signature))) { decompFile = tempfile(tmpdir=tmpdir) on.exit(unlink(decompFile), add=TRUE) - tryCatch({ - .Call(Cdt_zstd_decompress, file, decompFile) - }, error = function(e) { - stopf("zstd decompression of file '%s' failed:\n %s\nThis can happen when the disk is full in the temporary directory ('%s'). See ?fread for the tmpdir argument.", file, conditionMessage(e), tmpdir) - }) - file = decompFile + if (w==4L || zstdsig) { + if (!haszstd()) + stopf("To read .zst files, fread() requires zstd library support. data.table was compiled without zstd. Please reinstall data.table after installing the zstd development library (libzstd-dev on Debian/Ubuntu, libzstd-devel on Fedora/EPEL, zstd on Homebrew).") # nocov + tryCatch({ + .Call(Cdt_zstd_decompress, file, decompFile) + }, error = function(e) { + stopf("zstd decompression of file '%s' failed:\n %s\nThis can happen when the disk is full in the temporary directory ('%s'). See ?fread for the tmpdir argument.", file, conditionMessage(e), tmpdir) + }) + } else { + if (!requireNamespace("R.utils", quietly = TRUE)) + stopf("To read %s files directly, fread() requires 'R.utils' package which cannot be found. Please install 'R.utils' using 'install.packages('R.utils')'.", if (w<=2L || gzsig) "gz" else "bz2") # nocov + # not worth doing a behavior test here, so just use getRversion(). + if (packageVersion("R.utils") < "2.13.0" && base::getRversion() >= "4.5.0") + stopf("Reading compressed files in fread requires R.utils version 2.13.0 or higher. Please upgrade R.utils.") # nocov + FUN = if (w<=2L || gzsig) gzfile else bzfile + tryCatch({ + R.utils::decompressFile(file, decompFile, ext=NULL, FUN=FUN, remove=FALSE) # ext is not used by decompressFile when destname is supplied, but isn't optional + }, error = function(e) { + stopf("R.utils::decompressFile failed to decompress file '%s':\n %s\n. This can happen when the disk is full in the temporary directory ('%s'). See ?fread for the tmpdir argument.", file, conditionMessage(e), tmpdir) + }) + } + file = decompFile # don't use 'tmpFile' symbol again, as tmpFile might be the http://domain.org/file.csv.gz download } file = enc2native(file) # CfreadR cannot handle UTF-8 if that is not the native encoding, see #3078. From 1e3ec41bde66f2a0f4ec671881650ef6a2175ec1 Mon Sep 17 00:00:00 2001 From: Benjamin Schwendinger Date: Mon, 16 Mar 2026 15:15:46 +0100 Subject: [PATCH 3/9] unify compression levels --- R/fwrite.R | 13 ++++++------- inst/tests/tests.Rraw | 4 ++-- man/fwrite.Rd | 6 +++--- 3 files changed, 11 insertions(+), 12 deletions(-) diff --git a/R/fwrite.R b/R/fwrite.R index e0781cb9e9..186c587292 100644 --- a/R/fwrite.R +++ b/R/fwrite.R @@ -9,8 +9,7 @@ fwrite = function(x, file="", append=FALSE, quote="auto", buffMB=8L, nThread=getDTthreads(verbose), showProgress=getOption("datatable.showProgress", interactive()), compress = c("auto", "none", "gzip", "zstd"), - compressLevel = 6L, - zstd_level = 3L, + compressLevel = NULL, yaml = FALSE, bom = FALSE, verbose=getOption("datatable.verbose", FALSE), @@ -27,8 +26,6 @@ fwrite = function(x, file="", append=FALSE, quote="auto", scipen = if (is.numeric(scipen)) as.integer(scipen) else 0L buffMB = as.integer(buffMB) nThread = as.integer(nThread) - compressLevel = as.integer(compressLevel) - zstd_level = as.integer(zstd_level) # write.csv default is 'double' so fwrite follows suit. write.table's default is 'escape' # validate arguments if (is.matrix(x)) { # coerce to data.table if input object is matrix @@ -51,8 +48,6 @@ fwrite = function(x, file="", append=FALSE, quote="auto", is.character(eol) && length(eol)==1L, length(qmethod) == 1L && qmethod %chin% c("double", "escape"), length(compress) == 1L && compress %chin% c("auto", "none", "gzip", "zstd"), - length(compressLevel) == 1L && 0L <= compressLevel && compressLevel <= 9L, - length(zstd_level) == 1L && 1L <= zstd_level && zstd_level <= 22L, isTRUEorFALSE(col.names), isTRUEorFALSE(append), isTRUEorFALSE(row.names), isTRUEorFALSE(verbose), isTRUEorFALSE(showProgress), isTRUEorFALSE(logical01), isTRUEorFALSE(bom), isTRUEorFALSE(forceDecimal), @@ -64,6 +59,10 @@ fwrite = function(x, file="", append=FALSE, quote="auto", is_gzip = compress == "gzip" || (compress == "auto" && endsWithAny(file, ".gz")) is_zstd = compress == "zstd" || (compress == "auto" && endsWithAny(file, ".zst")) + if (is.null(compressLevel)) compressLevel = if (is_zstd) 3L else 6L + compressLevel = as.integer(compressLevel) + if (is_zstd) { if (compressLevel < 1L || compressLevel > 22L) stopf("compressLevel must be between 1 and 22 for zstd compression.") } + else { if (compressLevel < 0L || compressLevel > 9L) stopf("compressLevel must be between 0 and 9 for gzip compression.") } file = path.expand(file) # "~/foo/bar" if (append && (file=="" || file.exists(file))) { @@ -127,7 +126,7 @@ fwrite = function(x, file="", append=FALSE, quote="auto", } .Call(CfwriteR, x, file, sep, sep2, eol, na, dec, quote, qmethod=="escape", append, row.names, col.names, logical01, scipen, dateTimeAs, buffMB, nThread, - showProgress, is_gzip, compressLevel, is_zstd, zstd_level, bom, yaml, verbose, encoding, forceDecimal) + showProgress, is_gzip, compressLevel, is_zstd, bom, yaml, verbose, encoding, forceDecimal) invisible() } diff --git a/inst/tests/tests.Rraw b/inst/tests/tests.Rraw index f86363e2cf..68245a48f6 100644 --- a/inst/tests/tests.Rraw +++ b/inst/tests/tests.Rraw @@ -9776,8 +9776,8 @@ if (!haszstd()) { test(1658.623, file.info(f1)$size < file.info(f2)$size) test(1658.63, fread(f1), DT) fwrite(DT, file=f3<-tempfile(), compress="zstd") - fwrite(DT, file=f4<-tempfile(), compress="zstd", zstd_level=1) - fwrite(DT, file=f5<-tempfile(), compress="zstd", zstd_level=22) + fwrite(DT, file=f4<-tempfile(), compress="zstd", compressLevel=1) + fwrite(DT, file=f5<-tempfile(), compress="zstd", compressLevel=22) fwrite(DT, file=f6<-tempfile(), compress="zstd", col.names=FALSE) test(1658.641, file.info(f3)$size, file.info(f1)$size) test(1658.642, file.info(f4)$size >= file.info(f1)$size) diff --git a/man/fwrite.Rd b/man/fwrite.Rd index 078d225343..434ee29ace 100644 --- a/man/fwrite.Rd +++ b/man/fwrite.Rd @@ -16,8 +16,8 @@ fwrite(x, file = "", append = FALSE, quote = "auto", dateTimeAs = c("ISO","squash","epoch","write.csv"), buffMB = 8L, nThread = getDTthreads(verbose), showProgress = getOption("datatable.showProgress", interactive()), - compress = c("auto", "none", "gzip"), - compressLevel = 6L, + compress = c("auto", "none", "gzip", "zstd"), + compressLevel = NULL, yaml = FALSE, bom = FALSE, verbose = getOption("datatable.verbose", FALSE), @@ -58,7 +58,7 @@ fwrite(x, file = "", append = FALSE, quote = "auto", \item{nThread}{The number of threads to use. Experiment to see what works best for your data on your hardware.} \item{showProgress}{ Display a progress meter on the console? Ignored when \code{file==""}. } \item{compress}{If \code{compress = "auto"} and if \code{file} ends in \code{.gz} then output format is gzipped csv else csv. If \code{compress = "none"}, output format is always csv. If \code{compress = "gzip"} then format is gzipped csv. Output to the console is never gzipped even if \code{compress = "gzip"}. By default, \code{compress = "auto"}.} - \item{compressLevel}{Level of compression between 1 and 9, 6 by default. See \url{https://www.gnu.org/software/gzip/manual/html_node/Invoking-gzip.html} for details.} + \item{compressLevel}{Level of compression. For gzip, an integer between 0 and 9 (default 6). For zstd, an integer between 1 and 22 (default 3). When \code{NULL} (default), the format default is used. See \url{https://www.gnu.org/software/gzip/manual/html_node/Invoking-gzip.html} or \url{https://facebook.github.io/zstd/zstd_manual.html} for details.} \item{yaml}{If \code{TRUE}, \code{fwrite} will output a CSVY file, that is, a CSV file with metadata stored as a YAML header, using \code{\link[yaml]{as.yaml}}. See \code{Details}. } \item{bom}{If \code{TRUE} a BOM (Byte Order Mark) sequence (EF BB BF) is added at the beginning of the file; format 'UTF-8 with BOM'.} \item{verbose}{Be chatty and report timings?} From 691a1a8c1f4078d029b3f8bda796e8eca3fd1202 Mon Sep 17 00:00:00 2001 From: Benjamin Schwendinger Date: Mon, 16 Mar 2026 16:57:46 +0100 Subject: [PATCH 4/9] fix signatures --- R/fwrite.R | 2 +- src/data.table.h | 2 +- src/fwrite.c | 12 +++++------- src/fwrite.h | 3 +-- src/fwriteR.c | 6 ++---- 5 files changed, 10 insertions(+), 15 deletions(-) diff --git a/R/fwrite.R b/R/fwrite.R index 186c587292..07631e511d 100644 --- a/R/fwrite.R +++ b/R/fwrite.R @@ -126,7 +126,7 @@ fwrite = function(x, file="", append=FALSE, quote="auto", } .Call(CfwriteR, x, file, sep, sep2, eol, na, dec, quote, qmethod=="escape", append, row.names, col.names, logical01, scipen, dateTimeAs, buffMB, nThread, - showProgress, is_gzip, compressLevel, is_zstd, bom, yaml, verbose, encoding, forceDecimal) + showProgress, is_gzip, is_zstd, compressLevel, bom, yaml, verbose, encoding, forceDecimal) invisible() } diff --git a/src/data.table.h b/src/data.table.h index d2d4d4873a..4aaab3a145 100644 --- a/src/data.table.h +++ b/src/data.table.h @@ -413,7 +413,7 @@ SEXP chmatch_R(SEXP, SEXP, SEXP); SEXP chmatchdup_R(SEXP, SEXP, SEXP); SEXP chin_R(SEXP, SEXP); SEXP freadR(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP); -SEXP fwriteR(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP); +SEXP fwriteR(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP); SEXP rbindlist(SEXP, SEXP, SEXP, SEXP, SEXP); SEXP setlistelt(SEXP, SEXP, SEXP); SEXP setS4elt(SEXP, SEXP, SEXP); diff --git a/src/fwrite.c b/src/fwrite.c index e0ef276651..3d3da92dca 100644 --- a/src/fwrite.c +++ b/src/fwrite.c @@ -45,8 +45,7 @@ static bool qmethodEscape=false; // when quoting fields, how to escape dou static int scipen; static bool squashDateTime=false; // 0=ISO(yyyy-mm-dd) 1=squash(yyyymmdd) static bool verbose=false; -static int gzip_level; -static int zstd_level; +static int compress_level; static bool forceDecimal=false; // force writing decimal points for numeric columns extern const char *getString(const void *, int64_t); @@ -593,7 +592,7 @@ int init_stream(z_stream *stream) { // Now we manage header and trailer. gzip file is slighty lower with -15 because no header/trailer are // written for each chunk. // For memLevel, 8 is the default value (128 KiB). memLevel=9 uses maximum memory for optimal speed. To be tested ? - int err = deflateInit2(stream, gzip_level, Z_DEFLATED, -15, 8, Z_DEFAULT_STRATEGY); + int err = deflateInit2(stream, compress_level, Z_DEFLATED, -15, 8, Z_DEFAULT_STRATEGY); return err; // # nocov } @@ -634,8 +633,7 @@ void fwriteMain(fwriteMainArgs args) doQuote = args.doQuote; int8_t quoteHeaders = args.doQuote; verbose = args.verbose; - gzip_level = args.gzip_level; - zstd_level = args.zstd_level; + compress_level = args.compress_level; forceDecimal = args.forceDecimal; size_t len; @@ -959,7 +957,7 @@ void fwriteMain(fwriteMainArgs args) if (args.is_zstd) { char* zbuff = zbuffPool; size_t mylen_hdr = (size_t)(ch - buff); - size_t zbuffUsed = ZSTD_compress(zbuff, zbuffSize, buff, mylen_hdr, zstd_level); + size_t zbuffUsed = ZSTD_compress(zbuff, zbuffSize, buff, mylen_hdr, compress_level); if (ZSTD_isError(zbuffUsed)) { ret1 = (int)ZSTD_getErrorCode(zbuffUsed); // # nocov } else { @@ -1100,7 +1098,7 @@ void fwriteMain(fwriteMainArgs args) #ifndef NOZSTD if (args.is_zstd && !failed) { myzstd_uncompressed = (size_t)(ch - myBuff); - size_t zstd_result = ZSTD_compress(myzBuff, zbuffSize, myBuff, myzstd_uncompressed, zstd_level); + size_t zstd_result = ZSTD_compress(myzBuff, zbuffSize, myBuff, myzstd_uncompressed, compress_level); if (ZSTD_isError(zstd_result)) { failed = true; my_failed_compress = (int)ZSTD_getErrorCode(zstd_result); diff --git a/src/fwrite.h b/src/fwrite.h index 2fedfcdf30..51eb359541 100644 --- a/src/fwrite.h +++ b/src/fwrite.h @@ -109,9 +109,8 @@ typedef struct fwriteMainArgs int nth; bool showProgress; bool is_gzip; - int gzip_level; bool is_zstd; - int zstd_level; + int compress_level; bool bom; const char *yaml; bool verbose; diff --git a/src/fwriteR.c b/src/fwriteR.c index c79da95024..76de47abc7 100644 --- a/src/fwriteR.c +++ b/src/fwriteR.c @@ -168,9 +168,8 @@ SEXP fwriteR( SEXP nThread_Arg, SEXP showProgress_Arg, SEXP is_gzip_Arg, - SEXP gzip_level_Arg, SEXP is_zstd_Arg, - SEXP zstd_level_Arg, + SEXP compress_level_Arg, SEXP bom_Arg, SEXP yaml_Arg, SEXP verbose_Arg, @@ -182,9 +181,8 @@ SEXP fwriteR( fwriteMainArgs args = { 0 }; // { 0 } to quieten valgrind's uninitialized, #4639 args.is_gzip = LOGICAL(is_gzip_Arg)[0]; - args.gzip_level = INTEGER(gzip_level_Arg)[0]; args.is_zstd = LOGICAL(is_zstd_Arg)[0]; - args.zstd_level = INTEGER(zstd_level_Arg)[0]; + args.compress_level = INTEGER(compress_level_Arg)[0]; args.bom = LOGICAL(bom_Arg)[0]; args.yaml = CHAR(STRING_ELT(yaml_Arg, 0)); args.verbose = LOGICAL(verbose_Arg)[0]; From 1d674d901e419396c9f374d52dc8acb3b2f04549 Mon Sep 17 00:00:00 2001 From: Benjamin Schwendinger Date: Tue, 17 Mar 2026 14:55:21 +0100 Subject: [PATCH 5/9] add xzfile support --- R/fread.R | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/R/fread.R b/R/fread.R index 53d1a5f8df..d0508601cc 100644 --- a/R/fread.R +++ b/R/fread.R @@ -120,8 +120,9 @@ yaml=FALSE, tmpdir=tempdir(), tz="UTC") gzsig = FALSE zstdsig = FALSE - if ((w <- endsWithAny(file, c(".gz", ".bgz",".bz2", ".zst"))) || (gzsig <- is_gzip(file_signature)) || is_bzip(file_signature) || - (zstdsig <- is_zstd(file_signature))) { + xzsig = FALSE + if ((w <- endsWithAny(file, c(".gz", ".bgz", ".bz2", ".zst", ".xz"))) || (gzsig <- is_gzip(file_signature)) || is_bzip(file_signature) || + (zstdsig <- is_zstd(file_signature)) || (xzsig <- is_xz(file_signature))) { decompFile = tempfile(tmpdir=tmpdir) on.exit(unlink(decompFile), add=TRUE) if (w==4L || zstdsig) { @@ -133,12 +134,13 @@ yaml=FALSE, tmpdir=tempdir(), tz="UTC") stopf("zstd decompression of file '%s' failed:\n %s\nThis can happen when the disk is full in the temporary directory ('%s'). See ?fread for the tmpdir argument.", file, conditionMessage(e), tmpdir) }) } else { + fmt = if (w<=2L || gzsig) "gz" else if (w==5L || xzsig) "xz" else "bz2" if (!requireNamespace("R.utils", quietly = TRUE)) - stopf("To read %s files directly, fread() requires 'R.utils' package which cannot be found. Please install 'R.utils' using 'install.packages('R.utils')'.", if (w<=2L || gzsig) "gz" else "bz2") # nocov + stopf("To read %s files directly, fread() requires 'R.utils' package which cannot be found. Please install 'R.utils' using 'install.packages('R.utils')'.", fmt) # nocov # not worth doing a behavior test here, so just use getRversion(). if (packageVersion("R.utils") < "2.13.0" && base::getRversion() >= "4.5.0") stopf("Reading compressed files in fread requires R.utils version 2.13.0 or higher. Please upgrade R.utils.") # nocov - FUN = if (w<=2L || gzsig) gzfile else bzfile + FUN = if (w<=2L || gzsig) gzfile else if (w==5L || xzsig) xzfile else bzfile tryCatch({ R.utils::decompressFile(file, decompFile, ext=NULL, FUN=FUN, remove=FALSE) # ext is not used by decompressFile when destname is supplied, but isn't optional }, error = function(e) { @@ -409,7 +411,8 @@ known_signatures = list( zip = as.raw(c(0x50, 0x4b, 0x03, 0x04)), # charToRaw("PK\x03\x04") gzip = as.raw(c(0x1F, 0x8B)), bzip = as.raw(c(0x42, 0x5A, 0x68)), - zstd = as.raw(c(0x28, 0xB5, 0x2F, 0xFD)) # zstd frame magic (little-endian 0xFD2FB528) + zstd = as.raw(c(0x28, 0xB5, 0x2F, 0xFD)), # zstd frame magic (little-endian 0xFD2FB528) + xz = as.raw(c(0xFD, 0x37, 0x7A, 0x58, 0x5A, 0x00)) # https://tukaani.org/xz/xz-file-format.txt ) # https://en.wikipedia.org/wiki/ZIP_(file_format)#File_headers @@ -435,6 +438,11 @@ is_zstd = function(file_signature) { identical(file_signature[1:4], known_signatures$zstd) } +# https://tukaani.org/xz/xz-file-format.txt section 2.1.1.1 +is_xz = function(file_signature) { + identical(file_signature[1:6], known_signatures$xz) +} + # simplified but faster version of `factor()` for internal use. as_factor = function(x) { lev = forderv(x, retGrp = TRUE, na.last = NA) From 3660fc094c6b1dc6503b10c1aad50a394dd67478 Mon Sep 17 00:00:00 2001 From: Benjamin Schwendinger Date: Tue, 17 Mar 2026 16:21:09 +0100 Subject: [PATCH 6/9] add xzfile test --- inst/tests/tests.Rraw | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/inst/tests/tests.Rraw b/inst/tests/tests.Rraw index 68245a48f6..799976a313 100644 --- a/inst/tests/tests.Rraw +++ b/inst/tests/tests.Rraw @@ -18501,6 +18501,11 @@ if (test_R.utils) { R.utils::bzip2(tmp, tmp2 <- tempfile(), remove=FALSE) # _not_ with .bz2 extension test(2273.4, fread(tmp2), DT) file.remove(tmp2) + R.utils::xz(tmp, tmp2 <- tempfile(fileext=".xz"), remove=FALSE) + test(2273.5, fread(tmp2), DT) # by .xz extension + R.utils::xz(tmp, tmp3 <- tempfile(), remove=FALSE) # _not_ with .xz extension + test(2273.6, fread(tmp3), DT) # by signature + file.remove(c(tmp2, tmp3)) } file.remove(tmp) From 682bf66e633dbf902f5dbfe8f344cd11b0fddf6e Mon Sep 17 00:00:00 2001 From: Benjamin Schwendinger Date: Tue, 17 Mar 2026 16:22:59 +0100 Subject: [PATCH 7/9] use R_ExecWithCleanup --- src/utils.c | 105 ++++++++++++++++++++++++---------------------------- 1 file changed, 49 insertions(+), 56 deletions(-) diff --git a/src/utils.c b/src/utils.c index 3de289e73c..6e0c2a5f87 100644 --- a/src/utils.c +++ b/src/utils.c @@ -451,70 +451,63 @@ SEXP dt_has_zstd(void) { return ScalarLogical(0); #endif } -SEXP dt_zstd_decompress(SEXP infile_sexp, SEXP outfile_sexp) { #ifndef NOZSTD - const char *infile = CHAR(STRING_ELT(infile_sexp, 0)); - const char *outfile = CHAR(STRING_ELT(outfile_sexp, 0)); - - FILE *fin = fopen(infile, "rb"); - if (!fin) error(_("Cannot open input file for zstd decompression: '%s'"), infile); - - FILE *fout = fopen(outfile, "wb"); - if (!fout) { - fclose(fin); - error(_("Cannot open output file for zstd decompression: '%s'"), outfile); - } - - size_t const buffInSize = ZSTD_DStreamInSize(); - void *buffIn = malloc(buffInSize); - size_t const buffOutSize = ZSTD_DStreamOutSize(); - void *buffOut = malloc(buffOutSize); - - if (!buffIn || !buffOut) { - // # nocov start - free(buffIn); free(buffOut); - fclose(fin); fclose(fout); - error(_("Failed to allocate buffers for zstd decompression")); - // # nocov end - } - - ZSTD_DStream *dstream = ZSTD_createDStream(); - if (!dstream) { - // # nocov start - free(buffIn); free(buffOut); - fclose(fin); fclose(fout); - error(_("Failed to create zstd decompression stream")); - // # nocov end - } - ZSTD_initDStream(dstream); +typedef struct { + const char *infile; + const char *outfile; + FILE *fin; + FILE *fout; + void *buffIn; + void *buffOut; + ZSTD_DStream *dstream; +} zstd_decompress_ctx; + +static void zstd_decompress_cleanup(void *data) { + zstd_decompress_ctx *ctx = (zstd_decompress_ctx *)data; + free(ctx->buffIn); + free(ctx->buffOut); + if (ctx->dstream) ZSTD_freeDStream(ctx->dstream); + if (ctx->fin) fclose(ctx->fin); + if (ctx->fout) fclose(ctx->fout); +} + +static SEXP zstd_decompress_impl(void *data) { + zstd_decompress_ctx *ctx = (zstd_decompress_ctx *)data; + + ctx->fin = fopen(ctx->infile, "rb"); + if (!ctx->fin) error(_("Cannot open input file for zstd decompression: '%s'"), ctx->infile); + + ctx->fout = fopen(ctx->outfile, "wb"); + if (!ctx->fout) error(_("Cannot open output file for zstd decompression: '%s'"), ctx->outfile); + + ctx->buffIn = malloc(ZSTD_DStreamInSize()); + ctx->buffOut = malloc(ZSTD_DStreamOutSize()); + if (!ctx->buffIn || !ctx->buffOut) error(_("Failed to allocate buffers for zstd decompression")); // # nocov + + ctx->dstream = ZSTD_createDStream(); + if (!ctx->dstream) error(_("Failed to create zstd decompression stream")); // # nocov + ZSTD_initDStream(ctx->dstream); size_t read; - while ((read = fread(buffIn, 1, buffInSize, fin)) > 0) { - ZSTD_inBuffer input = { buffIn, read, 0 }; + while ((read = fread(ctx->buffIn, 1, ZSTD_DStreamInSize(), ctx->fin)) > 0) { + ZSTD_inBuffer input = { ctx->buffIn, read, 0 }; while (input.pos < input.size) { - ZSTD_outBuffer output = { buffOut, buffOutSize, 0 }; - size_t ret = ZSTD_decompressStream(dstream, &output, &input); - if (ZSTD_isError(ret)) { - ZSTD_freeDStream(dstream); - free(buffIn); free(buffOut); - fclose(fin); fclose(fout); + ZSTD_outBuffer output = { ctx->buffOut, ZSTD_DStreamOutSize(), 0 }; + size_t ret = ZSTD_decompressStream(ctx->dstream, &output, &input); + if (ZSTD_isError(ret)) error(_("zstd decompression error: %s"), ZSTD_getErrorName(ret)); - } - if (fwrite(buffOut, 1, output.pos, fout) != output.pos) { - // # nocov start - ZSTD_freeDStream(dstream); - free(buffIn); free(buffOut); - fclose(fin); fclose(fout); - error(_("Failed to write decompressed data to '%s'"), outfile); - // # nocov end - } + if (fwrite(ctx->buffOut, 1, output.pos, ctx->fout) != output.pos) + error(_("Failed to write decompressed data to '%s'"), ctx->outfile); // # nocov } } - - ZSTD_freeDStream(dstream); - free(buffIn); free(buffOut); - fclose(fin); fclose(fout); return R_NilValue; +} +#endif + +SEXP dt_zstd_decompress(SEXP infile_sexp, SEXP outfile_sexp) { +#ifndef NOZSTD + zstd_decompress_ctx ctx = { CHAR(STRING_ELT(infile_sexp, 0)), CHAR(STRING_ELT(outfile_sexp, 0)), NULL, NULL, NULL, NULL, NULL }; + return R_ExecWithCleanup(zstd_decompress_impl, &ctx, zstd_decompress_cleanup, &ctx); #else error(_("zstd header files were not found when data.table was compiled")); // # nocov return R_NilValue; // # nocov From d66c5fb3c41f28ac6a06ff80cac7962447b78a70 Mon Sep 17 00:00:00 2001 From: Benjamin Schwendinger Date: Tue, 17 Mar 2026 16:32:45 +0100 Subject: [PATCH 8/9] fix tests --- inst/tests/tests.Rraw | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/inst/tests/tests.Rraw b/inst/tests/tests.Rraw index 799976a313..8225187736 100644 --- a/inst/tests/tests.Rraw +++ b/inst/tests/tests.Rraw @@ -18501,10 +18501,14 @@ if (test_R.utils) { R.utils::bzip2(tmp, tmp2 <- tempfile(), remove=FALSE) # _not_ with .bz2 extension test(2273.4, fread(tmp2), DT) file.remove(tmp2) - R.utils::xz(tmp, tmp2 <- tempfile(fileext=".xz"), remove=FALSE) - test(2273.5, fread(tmp2), DT) # by .xz extension - R.utils::xz(tmp, tmp3 <- tempfile(), remove=FALSE) # _not_ with .xz extension - test(2273.6, fread(tmp3), DT) # by signature + con = xzfile(tmp2 <- tempfile(fileext=".xz"), open="wt") + write.table(DT, con, sep = ",", row.names = FALSE, col.names = TRUE, quote = FALSE) + close(con) + test(2273.5, fread(tmp2), DT) + con = xzfile(tmp3 <- tempfile(), open="wt") + write.table(DT, con, sep = ",", row.names = FALSE, col.names = TRUE, quote = FALSE) + close(con) + test(2273.6, fread(tmp3), DT) file.remove(c(tmp2, tmp3)) } file.remove(tmp) From 51cc254ac45abb4479ddf9f17286f4c7ddbfebdf Mon Sep 17 00:00:00 2001 From: Benjamin Schwendinger Date: Tue, 17 Mar 2026 17:42:44 +0100 Subject: [PATCH 9/9] remove --- src/fwrite.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/fwrite.c b/src/fwrite.c index 3d3da92dca..13c8e87ab1 100644 --- a/src/fwrite.c +++ b/src/fwrite.c @@ -821,13 +821,13 @@ void fwriteMain(fwriteMainArgs args) } // init compress variables (shared across gzip and zstd) +#ifndef NOZLIB + z_stream strm; // NB: fine to free() this even if unallocated (free(NULL) is safe) char *zbuffPool = NULL; size_t zbuffSize = 0; size_t compress_len = 0; -#ifndef NOZLIB - z_stream strm; if (args.is_gzip) { // compute zbuffSize which is the same for each thread if (init_stream(&strm) != Z_OK) {