diff --git a/R/fread.R b/R/fread.R index b95e0f9b0..b34364a78 100644 --- a/R/fread.R +++ b/R/fread.R @@ -209,20 +209,34 @@ yaml=FALSE, tmpdir=tempdir(), tz="UTC") } gzsig = FALSE - if ((w <- endsWithAny(file, c(".gz", ".bgz",".bz2"))) || (gzsig <- is_gzip(file_signature)) || is_bzip(file_signature)) { - if (!requireNamespace("R.utils", quietly = TRUE)) - stopf("To read %s files directly, fread() requires 'R.utils' package which cannot be found. Please install 'R.utils' using 'install.packages('R.utils')'.", if (w<=2L || gzsig) "gz" else "bz2") # nocov - # not worth doing a behavior test here, so just use getRversion(). - if (packageVersion("R.utils") < "2.13.0" && base::getRversion() >= "4.5.0") - stopf("Reading compressed files in fread requires R.utils version 2.13.0 or higher. Please upgrade R.utils.") # nocov - FUN = if (w<=2L || gzsig) gzfile else bzfile + zstdsig = FALSE + xzsig = FALSE + if ((w <- endsWithAny(file, c(".gz", ".bgz", ".bz2", ".zst", ".xz"))) || (gzsig <- is_gzip(file_signature)) || is_bzip(file_signature) || + (zstdsig <- is_zstd(file_signature)) || (xzsig <- is_xz(file_signature))) { decompFile = tempfile(tmpdir=tmpdir) on.exit(unlink(decompFile), add=TRUE) - tryCatch({ - R.utils::decompressFile(file, decompFile, ext=NULL, FUN=FUN, remove=FALSE) # ext is not used by decompressFile when destname is supplied, but isn't optional - }, error = function(e) { - stopf("R.utils::decompressFile failed to decompress file '%s':\n %s\n. This can happen when the disk is full in the temporary directory ('%s'). See ?fread for the tmpdir argument.", file, conditionMessage(e), tmpdir) - }) + if (w==4L || zstdsig) { + if (!haszstd()) + stopf("To read .zst files, fread() requires zstd library support. data.table was compiled without zstd. Please reinstall data.table after installing the zstd development library (libzstd-dev on Debian/Ubuntu, libzstd-devel on Fedora/EPEL, zstd on Homebrew).") # nocov + tryCatch({ + .Call(Cdt_zstd_decompress, file, decompFile) + }, error = function(e) { + stopf("zstd decompression of file '%s' failed:\n %s\nThis can happen when the disk is full in the temporary directory ('%s'). See ?fread for the tmpdir argument.", file, conditionMessage(e), tmpdir) + }) + } else { + fmt = if (w<=2L || gzsig) "gz" else if (w==5L || xzsig) "xz" else "bz2" + if (!requireNamespace("R.utils", quietly = TRUE)) + stopf("To read %s files directly, fread() requires 'R.utils' package which cannot be found. Please install 'R.utils' using 'install.packages('R.utils')'.", fmt) # nocov + # not worth doing a behavior test here, so just use getRversion(). + if (packageVersion("R.utils") < "2.13.0" && base::getRversion() >= "4.5.0") + stopf("Reading compressed files in fread requires R.utils version 2.13.0 or higher. Please upgrade R.utils.") # nocov + FUN = if (w<=2L || gzsig) gzfile else if (w==5L || xzsig) xzfile else bzfile + tryCatch({ + R.utils::decompressFile(file, decompFile, ext=NULL, FUN=FUN, remove=FALSE) # ext is not used by decompressFile when destname is supplied, but isn't optional + }, error = function(e) { + stopf("R.utils::decompressFile failed to decompress file '%s':\n %s\n. This can happen when the disk is full in the temporary directory ('%s'). See ?fread for the tmpdir argument.", file, conditionMessage(e), tmpdir) + }) + } file = decompFile # don't use 'tmpFile' symbol again, as tmpFile might be the http://domain.org/file.csv.gz download } file = enc2native(file) # CfreadR cannot handle UTF-8 if that is not the native encoding, see #3078. @@ -486,7 +500,9 @@ yaml=FALSE, tmpdir=tempdir(), tz="UTC") known_signatures = list( zip = as.raw(c(0x50, 0x4b, 0x03, 0x04)), # charToRaw("PK\x03\x04") gzip = as.raw(c(0x1F, 0x8B)), - bzip = as.raw(c(0x42, 0x5A, 0x68)) + bzip = as.raw(c(0x42, 0x5A, 0x68)), + zstd = as.raw(c(0x28, 0xB5, 0x2F, 0xFD)), # zstd frame magic (little-endian 0xFD2FB528) + xz = as.raw(c(0xFD, 0x37, 0x7A, 0x58, 0x5A, 0x00)) # https://tukaani.org/xz/xz-file-format.txt ) # https://en.wikipedia.org/wiki/ZIP_(file_format)#File_headers @@ -507,6 +523,16 @@ is_bzip = function(file_signature) { isTRUE(file_signature[4L] %in% charToRaw('123456789')) # for #6304 } +# https://github.com/facebook/zstd/blob/dev/doc/zstd_compression_format.md#zstandard-frames +is_zstd = function(file_signature) { + identical(file_signature[1:4], known_signatures$zstd) +} + +# https://tukaani.org/xz/xz-file-format.txt section 2.1.1.1 +is_xz = function(file_signature) { + identical(file_signature[1:6], known_signatures$xz) +} + # simplified but faster version of `factor()` for internal use. as_factor = function(x) { lev = forderv(x, retGrp = TRUE, na.last = NA) diff --git a/R/fwrite.R b/R/fwrite.R index 5d91b4e34..07631e511 100644 --- a/R/fwrite.R +++ b/R/fwrite.R @@ -8,8 +8,8 @@ fwrite = function(x, file="", append=FALSE, quote="auto", dateTimeAs = c("ISO","squash","epoch","write.csv"), buffMB=8L, nThread=getDTthreads(verbose), showProgress=getOption("datatable.showProgress", interactive()), - compress = c("auto", "none", "gzip"), - compressLevel = 6L, + compress = c("auto", "none", "gzip", "zstd"), + compressLevel = NULL, yaml = FALSE, bom = FALSE, verbose=getOption("datatable.verbose", FALSE), @@ -26,7 +26,6 @@ fwrite = function(x, file="", append=FALSE, quote="auto", scipen = if (is.numeric(scipen)) as.integer(scipen) else 0L buffMB = as.integer(buffMB) nThread = as.integer(nThread) - compressLevel = as.integer(compressLevel) # write.csv default is 'double' so fwrite follows suit. write.table's default is 'escape' # validate arguments if (is.matrix(x)) { # coerce to data.table if input object is matrix @@ -48,8 +47,7 @@ fwrite = function(x, file="", append=FALSE, quote="auto", `dec and sep must be distinct whenever both might be needed` = (!NROW(x) || NCOL(x) <= 1L || dec != sep), # sep2!=dec and sep2!=sep checked at C level when we know if list columns are present is.character(eol) && length(eol)==1L, length(qmethod) == 1L && qmethod %chin% c("double", "escape"), - length(compress) == 1L && compress %chin% c("auto", "none", "gzip"), - length(compressLevel) == 1L && 0L <= compressLevel && compressLevel <= 9L, + length(compress) == 1L && compress %chin% c("auto", "none", "gzip", "zstd"), isTRUEorFALSE(col.names), isTRUEorFALSE(append), isTRUEorFALSE(row.names), isTRUEorFALSE(verbose), isTRUEorFALSE(showProgress), isTRUEorFALSE(logical01), isTRUEorFALSE(bom), isTRUEorFALSE(forceDecimal), @@ -60,6 +58,11 @@ fwrite = function(x, file="", append=FALSE, quote="auto", ) is_gzip = compress == "gzip" || (compress == "auto" && endsWithAny(file, ".gz")) + is_zstd = compress == "zstd" || (compress == "auto" && endsWithAny(file, ".zst")) + if (is.null(compressLevel)) compressLevel = if (is_zstd) 3L else 6L + compressLevel = as.integer(compressLevel) + if (is_zstd) { if (compressLevel < 1L || compressLevel > 22L) stopf("compressLevel must be between 1 and 22 for zstd compression.") } + else { if (compressLevel < 0L || compressLevel > 9L) stopf("compressLevel must be between 0 and 9 for gzip compression.") } file = path.expand(file) # "~/foo/bar" if (append && (file=="" || file.exists(file))) { @@ -123,8 +126,9 @@ fwrite = function(x, file="", append=FALSE, quote="auto", } .Call(CfwriteR, x, file, sep, sep2, eol, na, dec, quote, qmethod=="escape", append, row.names, col.names, logical01, scipen, dateTimeAs, buffMB, nThread, - showProgress, is_gzip, compressLevel, bom, yaml, verbose, encoding, forceDecimal) + showProgress, is_gzip, is_zstd, compressLevel, bom, yaml, verbose, encoding, forceDecimal) invisible() } haszlib = function() .Call(Cdt_has_zlib) +haszstd = function() .Call(Cdt_has_zstd) diff --git a/configure b/configure index 44c1df2c4..9132a6dc9 100755 --- a/configure +++ b/configure @@ -175,7 +175,7 @@ sed -e "s|@PKG_CFLAGS@|$PKG_CFLAGS|" -e "s|@PKG_LIBS@|$PKG_LIBS|" src/Makevars.i # optional dependency on zlib if [ "$NOZLIB" = "1" ]; then - echo "*** Compilation without compression support in fwrite" + echo "*** Compilation without gzip compression support in fwrite" sed -e "s|@zlib_cflags@|-DNOZLIB|" src/Makevars > src/Makevars.tmp && mv src/Makevars.tmp src/Makevars sed -e "s|@zlib_libs@||" src/Makevars > src/Makevars.tmp && mv src/Makevars.tmp src/Makevars else @@ -183,4 +183,30 @@ else sed -e "s|@zlib_libs@|${lib}|" src/Makevars > src/Makevars.tmp && mv src/Makevars.tmp src/Makevars fi +# optional dependency on zstd +NOZSTD=1 +pkg-config --exists libzstd +if [ $? -ne 0 ]; then + echo "*** pkg-config cannot find libzstd; zstd compression will be disabled." + echo "*** To enable zstd support, install the zstd development library:" + echo "*** deb: libzstd-dev (Debian, Ubuntu, ...)" + echo "*** rpm: libzstd-devel (Fedora, EPEL, ...)" + echo "*** brew: zstd (macOS)" +else + NOZSTD=0 + zstd_lib=`pkg-config --libs libzstd` + zstd_cflag=`pkg-config --cflags libzstd` + zstd_version=`pkg-config --modversion libzstd` + echo "zstd ${zstd_version} is available ok" +fi + +if [ "$NOZSTD" = "1" ]; then + echo "*** Compilation without zstd compression support in fwrite/fread" + sed -e "s|@zstd_cflags@|-DNOZSTD|" src/Makevars > src/Makevars.tmp && mv src/Makevars.tmp src/Makevars + sed -e "s|@zstd_libs@||" src/Makevars > src/Makevars.tmp && mv src/Makevars.tmp src/Makevars +else + sed -e "s|@zstd_cflags@|${zstd_cflag}|" src/Makevars > src/Makevars.tmp && mv src/Makevars.tmp src/Makevars + sed -e "s|@zstd_libs@|${zstd_lib}|" src/Makevars > src/Makevars.tmp && mv src/Makevars.tmp src/Makevars +fi + exit 0 diff --git a/inst/tests/tests.Rraw b/inst/tests/tests.Rraw index b73b2767a..9e18f82ff 100644 --- a/inst/tests/tests.Rraw +++ b/inst/tests/tests.Rraw @@ -85,6 +85,7 @@ if (exists("test.data.table", .GlobalEnv, inherits=FALSE)) { which.last = data.table:::which.last `-.IDate` = data.table:::`-.IDate` haszlib = data.table:::haszlib + haszstd = data.table:::haszstd # Also, for functions that are masked by other packages, we need to map the data.table one. Or else, # the other package's function would be picked up. As above, we only need to do this because we desire @@ -9767,6 +9768,29 @@ if (haszlib()) { unlink(f) } +# fwrite/fread zstd compress # mirror tests from above +if (!haszstd()) { + test(1658.601, fwrite(data.table(a=1), file=tempfile(), compress="zstd"), error="header files were not found at the time data.table was compiled") +} else { + test(1658.61, fwrite(data.table(a=c(1:3), b=c(1:3)), compress="zstd"), output='a,b\n1,1\n2,2\n3,3') # compress ignored on console + DT = data.table(a=rep(1:2,each=100), b=rep(1:4,each=25)) + test(1658.621, fwrite(DT, file=f1<-tempfile(fileext=".zst"), verbose=TRUE), NULL, + output="args.nrow=200 args.ncol=2.*maxLineLen=5[12].*Writing 200 rows in 1 batches of 200 rows.*nth=1") + test(1658.622, fwrite(DT, file=f2<-tempfile()), NULL) + test(1658.623, file.info(f1)$size < file.info(f2)$size) + test(1658.63, fread(f1), DT) + fwrite(DT, file=f3<-tempfile(), compress="zstd") + fwrite(DT, file=f4<-tempfile(), compress="zstd", compressLevel=1) + fwrite(DT, file=f5<-tempfile(), compress="zstd", compressLevel=22) + fwrite(DT, file=f6<-tempfile(), compress="zstd", col.names=FALSE) + test(1658.641, file.info(f3)$size, file.info(f1)$size) + test(1658.642, file.info(f4)$size >= file.info(f1)$size) + test(1658.643, file.info(f1)$size >= file.info(f5)$size) + test(1658.644, fread(f6, col.names=c("a","b")), DT) + test(1658.645, fread(f3), DT) + unlink(c(f1,f2,f3,f4,f5,f6)) +} + # complex column support for fwrite, part of #3690 DT = data.table(a=1:3, z=0:2 - (2:0)*1i) test(1658.55, fwrite(DT), output='a,z\n1,0-2i\n2,1-1i\n3,2+0i') @@ -18481,6 +18505,15 @@ if (test_R.utils) { R.utils::bzip2(tmp, tmp2 <- tempfile(), remove=FALSE) # _not_ with .bz2 extension test(2273.4, fread(tmp2), DT) file.remove(tmp2) + con = xzfile(tmp2 <- tempfile(fileext=".xz"), open="wt") + write.table(DT, con, sep = ",", row.names = FALSE, col.names = TRUE, quote = FALSE) + close(con) + test(2273.5, fread(tmp2), DT) + con = xzfile(tmp3 <- tempfile(), open="wt") + write.table(DT, con, sep = ",", row.names = FALSE, col.names = TRUE, quote = FALSE) + close(con) + test(2273.6, fread(tmp3), DT) + file.remove(c(tmp2, tmp3)) } file.remove(tmp) diff --git a/man/fwrite.Rd b/man/fwrite.Rd index 078d22534..434ee29ac 100644 --- a/man/fwrite.Rd +++ b/man/fwrite.Rd @@ -16,8 +16,8 @@ fwrite(x, file = "", append = FALSE, quote = "auto", dateTimeAs = c("ISO","squash","epoch","write.csv"), buffMB = 8L, nThread = getDTthreads(verbose), showProgress = getOption("datatable.showProgress", interactive()), - compress = c("auto", "none", "gzip"), - compressLevel = 6L, + compress = c("auto", "none", "gzip", "zstd"), + compressLevel = NULL, yaml = FALSE, bom = FALSE, verbose = getOption("datatable.verbose", FALSE), @@ -58,7 +58,7 @@ fwrite(x, file = "", append = FALSE, quote = "auto", \item{nThread}{The number of threads to use. Experiment to see what works best for your data on your hardware.} \item{showProgress}{ Display a progress meter on the console? Ignored when \code{file==""}. } \item{compress}{If \code{compress = "auto"} and if \code{file} ends in \code{.gz} then output format is gzipped csv else csv. If \code{compress = "none"}, output format is always csv. If \code{compress = "gzip"} then format is gzipped csv. Output to the console is never gzipped even if \code{compress = "gzip"}. By default, \code{compress = "auto"}.} - \item{compressLevel}{Level of compression between 1 and 9, 6 by default. See \url{https://www.gnu.org/software/gzip/manual/html_node/Invoking-gzip.html} for details.} + \item{compressLevel}{Level of compression. For gzip, an integer between 0 and 9 (default 6). For zstd, an integer between 1 and 22 (default 3). When \code{NULL} (default), the format default is used. See \url{https://www.gnu.org/software/gzip/manual/html_node/Invoking-gzip.html} or \url{https://facebook.github.io/zstd/zstd_manual.html} for details.} \item{yaml}{If \code{TRUE}, \code{fwrite} will output a CSVY file, that is, a CSV file with metadata stored as a YAML header, using \code{\link[yaml]{as.yaml}}. See \code{Details}. } \item{bom}{If \code{TRUE} a BOM (Byte Order Mark) sequence (EF BB BF) is added at the beginning of the file; format 'UTF-8 with BOM'.} \item{verbose}{Be chatty and report timings?} diff --git a/src/Makevars.in b/src/Makevars.in index 500427c22..eef6b2f98 100644 --- a/src/Makevars.in +++ b/src/Makevars.in @@ -1,5 +1,5 @@ -PKG_CFLAGS = $(C_VISIBILITY) @PKG_CFLAGS@ @zlib_cflags@ -PKG_LIBS = $(C_VISIBILITY) @PKG_LIBS@ @zlib_libs@ +PKG_CFLAGS = $(C_VISIBILITY) @PKG_CFLAGS@ @zlib_cflags@ @zstd_cflags@ +PKG_LIBS = $(C_VISIBILITY) @PKG_LIBS@ @zlib_libs@ @zstd_libs@ # See WRE $1.2.1.1. But retain user supplied PKG_* too, #4664. # WRE states ($1.6) that += isn't portable and that we aren't allowed to use it. # Otherwise we could use the much simpler PKG_LIBS += @openmp_cflags@ -lz. diff --git a/src/data.table.h b/src/data.table.h index c561b1732..2f976e200 100644 --- a/src/data.table.h +++ b/src/data.table.h @@ -414,7 +414,7 @@ SEXP chmatchdup_R(SEXP, SEXP, SEXP); SEXP chin_R(SEXP, SEXP); SEXP freadR(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP); SEXP spillConnectionToFile(SEXP, SEXP, SEXP); -SEXP fwriteR(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP); +SEXP fwriteR(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP); SEXP rbindlist(SEXP, SEXP, SEXP, SEXP, SEXP); SEXP setlistelt(SEXP, SEXP, SEXP); SEXP setS4elt(SEXP, SEXP, SEXP); @@ -469,6 +469,9 @@ SEXP allNAR(SEXP); SEXP test_dt_win_snprintf(void); SEXP dt_zlib_version(void); SEXP dt_has_zlib(void); +SEXP dt_zstd_version(void); +SEXP dt_has_zstd(void); +SEXP dt_zstd_decompress(SEXP, SEXP); SEXP startsWithAny(SEXP, SEXP, SEXP); SEXP convertDate(SEXP, SEXP); SEXP fastmean(SEXP); diff --git a/src/fwrite.c b/src/fwrite.c index 670936423..13c8e87ab 100644 --- a/src/fwrite.c +++ b/src/fwrite.c @@ -10,6 +10,10 @@ #ifndef NOZLIB #include // for compression to .gz #endif +#ifndef NOZSTD +#include // for compression to .zst +#include // for ZSTD_ErrorCode, ZSTD_getErrorString +#endif #ifdef WIN32 #include @@ -41,7 +45,7 @@ static bool qmethodEscape=false; // when quoting fields, how to escape dou static int scipen; static bool squashDateTime=false; // 0=ISO(yyyy-mm-dd) 1=squash(yyyymmdd) static bool verbose=false; -static int gzip_level; +static int compress_level; static bool forceDecimal=false; // force writing decimal points for numeric columns extern const char *getString(const void *, int64_t); @@ -588,7 +592,7 @@ int init_stream(z_stream *stream) { // Now we manage header and trailer. gzip file is slighty lower with -15 because no header/trailer are // written for each chunk. // For memLevel, 8 is the default value (128 KiB). memLevel=9 uses maximum memory for optimal speed. To be tested ? - int err = deflateInit2(stream, gzip_level, Z_DEFLATED, -15, 8, Z_DEFAULT_STRATEGY); + int err = deflateInit2(stream, compress_level, Z_DEFLATED, -15, 8, Z_DEFAULT_STRATEGY); return err; // # nocov } @@ -629,7 +633,7 @@ void fwriteMain(fwriteMainArgs args) doQuote = args.doQuote; int8_t quoteHeaders = args.doQuote; verbose = args.verbose; - gzip_level = args.gzip_level; + compress_level = args.compress_level; forceDecimal = args.forceDecimal; size_t len; @@ -640,6 +644,10 @@ void fwriteMain(fwriteMainArgs args) if (args.is_gzip) STOP(_("Compression in fwrite uses zlib library. Its header files were not found at the time data.table was compiled. To enable fwrite compression, please reinstall data.table and study the output for further guidance.")); // # nocov #endif +#ifdef NOZSTD + if (args.is_zstd) + STOP(_("Compression in fwrite uses zstd library. Its header files were not found at the time data.table was compiled. To enable zstd compression, please reinstall data.table and study the output for further guidance.")); // # nocov +#endif // When NA is a non-empty string, then we must quote all string fields in case they contain the na string // na is recommended to be empty, though @@ -721,6 +729,7 @@ void fwriteMain(fwriteMainArgs args) if (*args.filename == '\0') { f = -1; // file="" means write to standard output args.is_gzip = false; // gzip is only for file + args.is_zstd = false; // zstd is only for file } else { #ifdef WIN32 f = _open(args.filename, _O_WRONLY | _O_BINARY | _O_CREAT | (args.append ? _O_APPEND : _O_TRUNC), _S_IWRITE); @@ -811,13 +820,14 @@ void fwriteMain(fwriteMainArgs args) buffSize / MEGA, nth, errno, strerror(errno)); // # nocov } - // init compress variables + // init compress variables (shared across gzip and zstd) #ifndef NOZLIB z_stream strm; - // NB: fine to free() this even if unallocated + // NB: fine to free() this even if unallocated (free(NULL) is safe) char *zbuffPool = NULL; size_t zbuffSize = 0; size_t compress_len = 0; + if (args.is_gzip) { // compute zbuffSize which is the same for each thread if (init_stream(&strm) != Z_OK) { @@ -829,25 +839,40 @@ void fwriteMain(fwriteMainArgs args) zbuffSize = deflateBound(&strm, buffSize); if (verbose) DTPRINT(_("zbuffSize=%d returned from deflateBound\n"), (int)zbuffSize); + } +#endif // #NOZLIB + +#ifndef NOZSTD + if (args.is_zstd) { + zbuffSize = ZSTD_compressBound(buffSize); + if (verbose) + DTPRINT(_("zbuffSize=%d returned from ZSTD_compressBound\n"), (int)zbuffSize); + } +#endif // #NOZSTD - // alloc nth zlib buffers + if (zbuffSize) { + // alloc nth compressed-output buffers (one per thread) // if headerLen > nth * zbuffSize (long variable names and 1 thread), alloc headerLen alloc_size = nth * zbuffSize < headerLen ? headerLen : nth * zbuffSize; - if (verbose) { + if (verbose) DTPRINT(_("Allocate %zu bytes (%zu MiB) for zbuffPool\n"), alloc_size, alloc_size / MEGA); - } zbuffPool = malloc(alloc_size); if (!zbuffPool) { // # nocov start free(buffPool); - deflateEnd(&strm); +#ifndef NOZLIB + if (args.is_gzip) deflateEnd(&strm); +#endif STOP(_("Unable to allocate %zu MiB * %d thread compressed buffers; '%d: %s'. Please read ?fwrite for nThread, buffMB and verbose options."), zbuffSize / MEGA, nth, errno, strerror(errno)); // # nocov end } + } + +#ifndef NOZLIB + if (args.is_gzip) { len = 0; crc = crc32(0L, Z_NULL, 0); - if (f != -1) { // write minimal gzip header, but not on the console static const char header[] = "\037\213\10\0\0\0\0\0\0\3"; @@ -868,6 +893,11 @@ void fwriteMain(fwriteMainArgs args) } #endif // #NOZLIB +#ifndef NOZSTD + if (args.is_zstd) + len = 0; +#endif + // write header // use buffPool and zbuffPool because we ensure that allocation is minimum headerLen @@ -912,7 +942,6 @@ void fwriteMain(fwriteMainArgs args) #ifndef NOZLIB if (args.is_gzip) { char* zbuff = zbuffPool; - size_t zbuffUsed = zbuffSize; len = (size_t)(ch - buff); crc = crc32(crc, (unsigned char*)buff, len); @@ -922,21 +951,35 @@ void fwriteMain(fwriteMainArgs args) ret2 = WRITE(f, zbuff, (int)zbuffUsed); compress_len += zbuffUsed; } - } else { + } else #endif - ret2 = WRITE(f, buff, (int)(ch - buff)); -#ifndef NOZLIB - } +#ifndef NOZSTD + if (args.is_zstd) { + char* zbuff = zbuffPool; + size_t mylen_hdr = (size_t)(ch - buff); + size_t zbuffUsed = ZSTD_compress(zbuff, zbuffSize, buff, mylen_hdr, compress_level); + if (ZSTD_isError(zbuffUsed)) { + ret1 = (int)ZSTD_getErrorCode(zbuffUsed); // # nocov + } else { + len += mylen_hdr; + ret2 = WRITE(f, zbuff, (int)zbuffUsed); + compress_len += zbuffUsed; + } + } else #endif + { + ret2 = WRITE(f, buff, (int)(ch - buff)); + } if (ret1 || ret2 == -1) { // # nocov start int errwrite = errno; // capture write errno now in case close fails with a different errno CLOSE(f); free(buffPool); -#ifndef NOZLIB free(zbuffPool); +#ifndef NOZLIB + if (args.is_gzip) deflateEnd(&strm); #endif - if (ret1) STOP(_("Failed to compress gzip. compressbuff() returned %d"), ret1); + if (ret1) STOP(_("Failed to compress header. Error code: %d"), ret1); else STOP(_("%s: '%s'"), strerror(errwrite), args.filename); // # nocov end } @@ -956,9 +999,7 @@ void fwriteMain(fwriteMainArgs args) if (verbose) DTPRINT(_("No data rows present (nrow==0)\n")); free(buffPool); -#ifndef NOZLIB - free(zbuffPool); -#endif + free(zbuffPool); // free(NULL) is safe if (f != -1 && CLOSE(f)) STOP(_("%s: '%s'"), strerror(errno), args.filename); // # nocov return; @@ -983,12 +1024,13 @@ void fwriteMain(fwriteMainArgs args) char* myBuff = buffPool + me * buffSize; char* ch = myBuff; + // shared compression output pointer (used by both gzip and zstd) + void *myzBuff = NULL; + size_t myzbuffUsed = 0; #ifndef NOZLIB z_stream mystream; size_t mylen = 0; int mycrc = 0; - void *myzBuff = NULL; - size_t myzbuffUsed = 0; if (args.is_gzip) { myzBuff = zbuffPool + me * zbuffSize; if (init_stream(&mystream) != Z_OK) { // this should be thread safe according to zlib documentation @@ -996,6 +1038,12 @@ void fwriteMain(fwriteMainArgs args) my_failed_compress = -998; // # nocov } } +#endif +#ifndef NOZSTD + size_t myzstd_uncompressed = 0; + if (args.is_zstd) { + myzBuff = zbuffPool + me * zbuffSize; + } #endif if (failed) continue; // Not break. Because we don't use #omp cancel yet. @@ -1047,6 +1095,20 @@ void fwriteMain(fwriteMainArgs args) } } #endif +#ifndef NOZSTD + if (args.is_zstd && !failed) { + myzstd_uncompressed = (size_t)(ch - myBuff); + size_t zstd_result = ZSTD_compress(myzBuff, zbuffSize, myBuff, myzstd_uncompressed, compress_level); + if (ZSTD_isError(zstd_result)) { + failed = true; + my_failed_compress = (int)ZSTD_getErrorCode(zstd_result); + myzbuffUsed = 0; + myzstd_uncompressed = 0; + } else { + myzbuffUsed = zstd_result; + } + } +#endif // ordered region ---- #pragma omp ordered @@ -1067,6 +1129,12 @@ void fwriteMain(fwriteMainArgs args) ret = WRITE(f, myzBuff, (int)myzbuffUsed); compress_len += myzbuffUsed; } else +#endif +#ifndef NOZSTD + if (args.is_zstd) { + ret = WRITE(f, myzBuff, (int)myzbuffUsed); + compress_len += myzbuffUsed; + } else #endif { ret = WRITE(f, myBuff, (int)(ch - myBuff)); @@ -1082,6 +1150,11 @@ void fwriteMain(fwriteMainArgs args) len += mylen; } #endif +#ifndef NOZSTD + if (args.is_zstd) { + len += myzstd_uncompressed; + } +#endif int used = 100 * ((double)(ch - myBuff)) / buffSize; // percentage of original buffMB if (used > maxBuffUsedPC) @@ -1112,10 +1185,9 @@ void fwriteMain(fwriteMainArgs args) } // end of parallel for loop free(buffPool); + free(zbuffPool); // free(NULL) is safe #ifndef NOZLIB - free(zbuffPool); - /* put a 4-byte integer into a byte array in LSB order */ #define PUT4(a,b) ((a)[0] = (b), (a)[1] = (b) >> 8, (a)[2] = (b) >> 16, (a)[3] = (b) >> 24) @@ -1132,6 +1204,7 @@ void fwriteMain(fwriteMainArgs args) STOP(_("Failed to write gzip trailer")); // # nocov } #endif + // no trailer needed for zstd: each chunk is a complete self-contained frame // Finished parallel region and can call R API safely now. if (hasPrinted) { @@ -1146,12 +1219,15 @@ void fwriteMain(fwriteMainArgs args) } if (verbose) { + if (args.is_gzip || args.is_zstd) { + DTPRINT(_("%s: uncompressed length=%zu (%zu MiB), compressed length=%zu (%zu MiB), ratio=%.1f%%"), + args.is_gzip ? "zlib" : "zstd", // # notranslate + len, len / MEGA, compress_len, compress_len / MEGA, len != 0 ? (100.0 * compress_len) / len : 0); #ifndef NOZLIB - if (args.is_gzip) { - DTPRINT(_("zlib: uncompressed length=%zu (%zu MiB), compressed length=%zu (%zu MiB), ratio=%.1f%%, crc=%x\n"), - len, len / MEGA, compress_len, compress_len / MEGA, len != 0 ? (100.0 * compress_len) / len : 0, crc); - } + if (args.is_gzip) DTPRINT(_(", crc=%x"), crc); #endif + DTPRINT("\n"); // # notranslate + } DTPRINT(Pl_(nth, Pl_(args.nrow, "Wrote %"PRId64" row in %.3f secs using %d thread. MaxBuffUsed=%d%%\n", "Wrote %"PRId64" rows in %.3f secs using %d thread. MaxBuffUsed=%d%%\n"), Pl_(args.nrow, "Wrote %"PRId64" row in %.3f secs using %d threads. MaxBuffUsed=%d%%\n", @@ -1168,11 +1244,18 @@ void fwriteMain(fwriteMainArgs args) if (failed) { // # nocov start #ifndef NOZLIB - if (failed_compress) + if (failed_compress && args.is_gzip) STOP(_("zlib %s (zlib.h %s) deflate() returned error %d Z_FINISH=%d Z_BLOCK=%d. %s"), zlibVersion(), ZLIB_VERSION, failed_compress, Z_FINISH, Z_BLOCK, verbose ? _("Please include the full output above and below this message in your data.table bug report.") : _("Please retry fwrite() with verbose=TRUE and include the full output with your data.table bug report.")); +#endif +#ifndef NOZSTD + if (failed_compress && args.is_zstd) + STOP(_("zstd %s compress failed (error code %d): %s. %s"), + ZSTD_versionString(), failed_compress, ZSTD_getErrorString((ZSTD_ErrorCode)failed_compress), + verbose ? _("Please include the full output above and below this message in your data.table bug report.") + : _("Please retry fwrite() with verbose=TRUE and include the full output with your data.table bug report.")); #endif if (failed_write) STOP("%s: '%s'", strerror(failed_write), args.filename); // # notranslate diff --git a/src/fwrite.h b/src/fwrite.h index 01b795b6f..51eb35954 100644 --- a/src/fwrite.h +++ b/src/fwrite.h @@ -109,7 +109,8 @@ typedef struct fwriteMainArgs int nth; bool showProgress; bool is_gzip; - int gzip_level; + bool is_zstd; + int compress_level; bool bom; const char *yaml; bool verbose; diff --git a/src/fwriteR.c b/src/fwriteR.c index b2d631fb3..76de47abc 100644 --- a/src/fwriteR.c +++ b/src/fwriteR.c @@ -168,7 +168,8 @@ SEXP fwriteR( SEXP nThread_Arg, SEXP showProgress_Arg, SEXP is_gzip_Arg, - SEXP gzip_level_Arg, + SEXP is_zstd_Arg, + SEXP compress_level_Arg, SEXP bom_Arg, SEXP yaml_Arg, SEXP verbose_Arg, @@ -180,7 +181,8 @@ SEXP fwriteR( fwriteMainArgs args = { 0 }; // { 0 } to quieten valgrind's uninitialized, #4639 args.is_gzip = LOGICAL(is_gzip_Arg)[0]; - args.gzip_level = INTEGER(gzip_level_Arg)[0]; + args.is_zstd = LOGICAL(is_zstd_Arg)[0]; + args.compress_level = INTEGER(compress_level_Arg)[0]; args.bom = LOGICAL(bom_Arg)[0]; args.yaml = CHAR(STRING_ELT(yaml_Arg, 0)); args.verbose = LOGICAL(verbose_Arg)[0]; diff --git a/src/init.c b/src/init.c index 93bef6290..fb58f5671 100644 --- a/src/init.c +++ b/src/init.c @@ -145,6 +145,9 @@ static const R_CallMethodDef callMethods[] = { {"Ctest_dt_win_snprintf", (DL_FUNC)&test_dt_win_snprintf, -1}, {"Cdt_zlib_version", (DL_FUNC)&dt_zlib_version, -1}, {"Cdt_has_zlib", (DL_FUNC)&dt_has_zlib, -1}, + {"Cdt_zstd_version", (DL_FUNC)&dt_zstd_version, -1}, + {"Cdt_has_zstd", (DL_FUNC)&dt_has_zstd, -1}, + {"Cdt_zstd_decompress", (DL_FUNC)&dt_zstd_decompress, 2}, {"Csubstitute_call_arg_namesR", (DL_FUNC)&substitute_call_arg_namesR, -1}, {"CstartsWithAny", (DL_FUNC)&startsWithAny, -1}, {"CconvertDate", (DL_FUNC)&convertDate, -1}, diff --git a/src/utils.c b/src/utils.c index d92dd20f5..6e0c2a5f8 100644 --- a/src/utils.c +++ b/src/utils.c @@ -431,6 +431,89 @@ SEXP dt_has_zlib(void) { #endif } +#ifndef NOZSTD +#include +#include +#endif +SEXP dt_zstd_version(void) { + char out[70]; +#ifndef NOZSTD + snprintf(out, sizeof(out), "ZSTD_VERSION_STRING==%s runtime==%s", ZSTD_VERSION_STRING, ZSTD_versionString()); // # notranslate +#else + snprintf(out, sizeof(out), _("zstd header files were not found when data.table was compiled")); +#endif + return ScalarString(mkChar(out)); +} +SEXP dt_has_zstd(void) { +#ifndef NOZSTD + return ScalarLogical(1); +#else + return ScalarLogical(0); +#endif +} +#ifndef NOZSTD +typedef struct { + const char *infile; + const char *outfile; + FILE *fin; + FILE *fout; + void *buffIn; + void *buffOut; + ZSTD_DStream *dstream; +} zstd_decompress_ctx; + +static void zstd_decompress_cleanup(void *data) { + zstd_decompress_ctx *ctx = (zstd_decompress_ctx *)data; + free(ctx->buffIn); + free(ctx->buffOut); + if (ctx->dstream) ZSTD_freeDStream(ctx->dstream); + if (ctx->fin) fclose(ctx->fin); + if (ctx->fout) fclose(ctx->fout); +} + +static SEXP zstd_decompress_impl(void *data) { + zstd_decompress_ctx *ctx = (zstd_decompress_ctx *)data; + + ctx->fin = fopen(ctx->infile, "rb"); + if (!ctx->fin) error(_("Cannot open input file for zstd decompression: '%s'"), ctx->infile); + + ctx->fout = fopen(ctx->outfile, "wb"); + if (!ctx->fout) error(_("Cannot open output file for zstd decompression: '%s'"), ctx->outfile); + + ctx->buffIn = malloc(ZSTD_DStreamInSize()); + ctx->buffOut = malloc(ZSTD_DStreamOutSize()); + if (!ctx->buffIn || !ctx->buffOut) error(_("Failed to allocate buffers for zstd decompression")); // # nocov + + ctx->dstream = ZSTD_createDStream(); + if (!ctx->dstream) error(_("Failed to create zstd decompression stream")); // # nocov + ZSTD_initDStream(ctx->dstream); + + size_t read; + while ((read = fread(ctx->buffIn, 1, ZSTD_DStreamInSize(), ctx->fin)) > 0) { + ZSTD_inBuffer input = { ctx->buffIn, read, 0 }; + while (input.pos < input.size) { + ZSTD_outBuffer output = { ctx->buffOut, ZSTD_DStreamOutSize(), 0 }; + size_t ret = ZSTD_decompressStream(ctx->dstream, &output, &input); + if (ZSTD_isError(ret)) + error(_("zstd decompression error: %s"), ZSTD_getErrorName(ret)); + if (fwrite(ctx->buffOut, 1, output.pos, ctx->fout) != output.pos) + error(_("Failed to write decompressed data to '%s'"), ctx->outfile); // # nocov + } + } + return R_NilValue; +} +#endif + +SEXP dt_zstd_decompress(SEXP infile_sexp, SEXP outfile_sexp) { +#ifndef NOZSTD + zstd_decompress_ctx ctx = { CHAR(STRING_ELT(infile_sexp, 0)), CHAR(STRING_ELT(outfile_sexp, 0)), NULL, NULL, NULL, NULL, NULL }; + return R_ExecWithCleanup(zstd_decompress_impl, &ctx, zstd_decompress_cleanup, &ctx); +#else + error(_("zstd header files were not found when data.table was compiled")); // # nocov + return R_NilValue; // # nocov +#endif +} + SEXP startsWithAny(const SEXP x, const SEXP y, SEXP start) { // for is_url in fread.R added in #5097 // basically any(startsWith()), short and simple ascii-only