diff --git a/Cargo.lock b/Cargo.lock index 40819faf..15e40bdb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -217,6 +217,31 @@ dependencies = [ "libc", ] +[[package]] +name = "crossbeam-deque" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9dd111b7b7f7d55b72c0a6ae361660ee5853c9af73f70c3c2ef6858b950e2e51" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" + [[package]] name = "ctor" version = "0.8.0" @@ -324,6 +349,12 @@ version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "92773504d58c093f6de2459af4af33faa518c13451eb8f2b5698ed3d36e7c813" +[[package]] +name = "either" +version = "1.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" + [[package]] name = "equivalent" version = "1.0.2" @@ -386,6 +417,7 @@ dependencies = [ "filetime", "nix 0.31.2", "onig", + "rayon", "regex", "tempfile", "uucore", @@ -842,6 +874,26 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c8d0fd677905edcbeedbf2edb6494d676f0e98d54d5cf9bda0b061cb8fb8aba" +[[package]] +name = "rayon" +version = "1.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "368f01d005bf8fd9b1206fb6fa653e6c4a81ceb1466406b81792d87c5677a58f" +dependencies = [ + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22e18b0f0062d30d4230b2e85ff77fdfe4326feb054b9783a3460d8435c8ab91" +dependencies = [ + "crossbeam-deque", + "crossbeam-utils", +] + [[package]] name = "redox_syscall" version = "0.7.4" @@ -1625,3 +1677,443 @@ name = "zmij" version = "1.0.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b8848ee67ecc8aedbaf3e4122217aff892639231befc6a1b58d29fff4c2cabaa" + +[[patch.unused]] +name = "clippy" +version = "0.1.96" + +[[patch.unused]] +name = "clippy_config" +version = "0.1.96" + +[[patch.unused]] +name = "clippy_dev" +version = "0.0.1" + +[[patch.unused]] +name = "clippy_dummy" +version = "0.0.303" + +[[patch.unused]] +name = "clippy_lints" +version = "0.1.96" + +[[patch.unused]] +name = "clippy_lints_internal" +version = "0.0.1" + +[[patch.unused]] +name = "clippy_test_deps" +version = "0.1.0" + +[[patch.unused]] +name = "clippy_utils" +version = "0.1.96" + +[[patch.unused]] +name = "cmd_lib" +version = "3.0.0" + +[[patch.unused]] +name = "cmd_lib_macros" +version = "2.0.0" + +[[patch.unused]] +name = "declare_clippy_lint" +version = "0.1.96" + +[[patch.unused]] +name = "gen-protos" +version = "0.40.0" + +[[patch.unused]] +name = "gitoxide" +version = "0.52.0" + +[[patch.unused]] +name = "gitoxide-core" +version = "0.55.0" + +[[patch.unused]] +name = "gix" +version = "0.81.0" + +[[patch.unused]] +name = "gix-actor" +version = "0.40.0" + +[[patch.unused]] +name = "gix-archive" +version = "0.30.0" + +[[patch.unused]] +name = "gix-attributes" +version = "0.31.0" + +[[patch.unused]] +name = "gix-attributes-fuzz" +version = "0.0.0" + +[[patch.unused]] +name = "gix-bitmap" +version = "0.3.0" + +[[patch.unused]] +name = "gix-blame" +version = "0.11.0" + +[[patch.unused]] +name = "gix-chunk" +version = "0.7.0" + +[[patch.unused]] +name = "gix-command" +version = "0.8.0" + +[[patch.unused]] +name = "gix-commitgraph" +version = "0.35.0" + +[[patch.unused]] +name = "gix-commitgraph-fuzz" +version = "0.0.0" + +[[patch.unused]] +name = "gix-config" +version = "0.54.0" + +[[patch.unused]] +name = "gix-config-fuzz" +version = "0.0.0" + +[[patch.unused]] +name = "gix-config-tests" +version = "0.0.0" + +[[patch.unused]] +name = "gix-config-value" +version = "0.17.1" + +[[patch.unused]] +name = "gix-config-value-fuzz" +version = "0.0.0" + +[[patch.unused]] +name = "gix-credentials" +version = "0.37.1" + +[[patch.unused]] +name = "gix-date" +version = "0.15.1" + +[[patch.unused]] +name = "gix-date-fuzz" +version = "0.0.0" + +[[patch.unused]] +name = "gix-diff" +version = "0.61.0" + +[[patch.unused]] +name = "gix-diff-tests" +version = "0.0.0" + +[[patch.unused]] +name = "gix-dir" +version = "0.23.0" + +[[patch.unused]] +name = "gix-discover" +version = "0.49.0" + +[[patch.unused]] +name = "gix-error" +version = "0.2.1" + +[[patch.unused]] +name = "gix-features" +version = "0.46.2" + +[[patch.unused]] +name = "gix-fetchhead" +version = "0.0.0" + +[[patch.unused]] +name = "gix-filter" +version = "0.28.0" + +[[patch.unused]] +name = "gix-fs" +version = "0.19.2" + +[[patch.unused]] +name = "gix-fsck" +version = "0.19.0" + +[[patch.unused]] +name = "gix-glob" +version = "0.24.0" + +[[patch.unused]] +name = "gix-hash" +version = "0.23.0" + +[[patch.unused]] +name = "gix-hashtable" +version = "0.13.0" + +[[patch.unused]] +name = "gix-ignore" +version = "0.19.1" + +[[patch.unused]] +name = "gix-index" +version = "0.49.0" + +[[patch.unused]] +name = "gix-index-tests" +version = "0.0.0" + +[[patch.unused]] +name = "gix-lfs" +version = "0.0.0" + +[[patch.unused]] +name = "gix-lock" +version = "21.0.2" + +[[patch.unused]] +name = "gix-macros" +version = "0.1.5" + +[[patch.unused]] +name = "gix-mailmap" +version = "0.32.0" + +[[patch.unused]] +name = "gix-merge" +version = "0.14.0" + +[[patch.unused]] +name = "gix-merge-fuzz" +version = "0.0.0" + +[[patch.unused]] +name = "gix-negotiate" +version = "0.29.0" + +[[patch.unused]] +name = "gix-note" +version = "0.0.0" + +[[patch.unused]] +name = "gix-object" +version = "0.58.0" + +[[patch.unused]] +name = "gix-object-fuzz" +version = "0.0.0" + +[[patch.unused]] +name = "gix-odb" +version = "0.78.0" + +[[patch.unused]] +name = "gix-odb-tests" +version = "0.0.0" + +[[patch.unused]] +name = "gix-pack" +version = "0.68.0" + +[[patch.unused]] +name = "gix-pack-tests" +version = "0.0.0" + +[[patch.unused]] +name = "gix-packetline" +version = "0.21.2" + +[[patch.unused]] +name = "gix-path" +version = "0.11.2" + +[[patch.unused]] +name = "gix-pathspec" +version = "0.16.1" + +[[patch.unused]] +name = "gix-pathspec-fuzz" +version = "0.0.0" + +[[patch.unused]] +name = "gix-prompt" +version = "0.14.1" + +[[patch.unused]] +name = "gix-protocol" +version = "0.59.0" + +[[patch.unused]] +name = "gix-quote" +version = "0.7.0" + +[[patch.unused]] +name = "gix-rebase" +version = "0.0.0" + +[[patch.unused]] +name = "gix-ref" +version = "0.61.0" + +[[patch.unused]] +name = "gix-ref-fuzz" +version = "0.0.0" + +[[patch.unused]] +name = "gix-ref-tests" +version = "0.0.0" + +[[patch.unused]] +name = "gix-refspec" +version = "0.39.0" + +[[patch.unused]] +name = "gix-refspec-fuzz" +version = "0.0.0" + +[[patch.unused]] +name = "gix-revision" +version = "0.43.0" + +[[patch.unused]] +name = "gix-revision-fuzz" +version = "0.0.0" + +[[patch.unused]] +name = "gix-revwalk" +version = "0.29.0" + +[[patch.unused]] +name = "gix-sec" +version = "0.13.2" + +[[patch.unused]] +name = "gix-sequencer" +version = "0.0.0" + +[[patch.unused]] +name = "gix-shallow" +version = "0.10.0" + +[[patch.unused]] +name = "gix-status" +version = "0.28.0" + +[[patch.unused]] +name = "gix-status-tests" +version = "0.0.0" + +[[patch.unused]] +name = "gix-submodule" +version = "0.28.0" + +[[patch.unused]] +name = "gix-tempfile" +version = "21.0.2" + +[[patch.unused]] +name = "gix-testtools" +version = "0.19.0" + +[[patch.unused]] +name = "gix-tix" +version = "0.0.0" + +[[patch.unused]] +name = "gix-trace" +version = "0.1.18" + +[[patch.unused]] +name = "gix-transport" +version = "0.55.1" + +[[patch.unused]] +name = "gix-traverse" +version = "0.55.0" + +[[patch.unused]] +name = "gix-traverse-tests" +version = "0.0.0" + +[[patch.unused]] +name = "gix-tui" +version = "0.0.0" + +[[patch.unused]] +name = "gix-url" +version = "0.35.2" + +[[patch.unused]] +name = "gix-url-fuzz" +version = "0.0.0" + +[[patch.unused]] +name = "gix-utils" +version = "0.3.1" + +[[patch.unused]] +name = "gix-validate" +version = "0.11.0" + +[[patch.unused]] +name = "gix-worktree" +version = "0.50.0" + +[[patch.unused]] +name = "gix-worktree-state" +version = "0.28.0" + +[[patch.unused]] +name = "gix-worktree-state-tests" +version = "0.0.0" + +[[patch.unused]] +name = "gix-worktree-stream" +version = "0.30.0" + +[[patch.unused]] +name = "gix-worktree-tests" +version = "0.0.0" + +[[patch.unused]] +name = "jj-cli" +version = "0.40.0" + +[[patch.unused]] +name = "jj-lib" +version = "0.40.0" + +[[patch.unused]] +name = "jj-lib-proc-macros" +version = "0.40.0" + +[[patch.unused]] +name = "kitten" +version = "4.0.0" + +[[patch.unused]] +name = "lintcheck" +version = "0.0.1" + +[[patch.unused]] +name = "rtk" +version = "0.34.3" + +[[patch.unused]] +name = "rustc_tools_util" +version = "0.4.2" + +[[patch.unused]] +name = "testutils" +version = "0.40.0" diff --git a/Cargo.toml b/Cargo.toml index 884983bc..12cc7ad2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,7 @@ authors = ["uutils developers"] [dependencies] argmax = "0.4.0" +rayon = "1" chrono = "0.4.44" clap = "4.6" faccess = "0.2.4" diff --git a/src/xargs/mod.rs b/src/xargs/mod.rs index f1a14cc4..ea87914a 100644 --- a/src/xargs/mod.rs +++ b/src/xargs/mod.rs @@ -12,9 +12,15 @@ use std::{ fs, io::{self, BufRead, BufReader, Read}, process::{Command, Stdio}, + sync::atomic::{AtomicU64, Ordering}, }; +static ACTIVE_PROCS: AtomicU64 = AtomicU64::new(0); + use clap::{crate_version, error::ErrorKind, Arg, ArgAction}; +use rayon::iter::{ + FromParallelIterator, IntoParallelIterator, ParallelBridge as _, ParallelIterator as _, +}; mod options { pub const COMMAND: &str = "COMMAND"; @@ -79,14 +85,14 @@ trait CommandSizeLimiter { arg: Argument, cursor: LimiterCursor<'_>, ) -> Result; - fn dyn_clone(&self) -> Box; + fn dyn_clone(&self) -> Box; } /// A pointer to the next limiter. A limiter should *always* call the cursor's /// `try_next` *before* updating its own state, to ensure that all other limiters /// are okay with the argument first. struct LimiterCursor<'collection> { - limiters: &'collection mut [Box], + limiters: &'collection mut [Box], } impl LimiterCursor<'_> { @@ -106,7 +112,7 @@ impl LimiterCursor<'_> { } struct LimiterCollection { - limiters: Vec>, + limiters: Vec>, } impl LimiterCollection { @@ -114,7 +120,7 @@ impl LimiterCollection { Self { limiters: vec![] } } - fn add(&mut self, limiter: impl CommandSizeLimiter + 'static) { + fn add(&mut self, limiter: impl CommandSizeLimiter + Send + Sync + 'static) { self.limiters.push(Box::new(limiter)); } @@ -208,7 +214,7 @@ impl CommandSizeLimiter for MaxCharsCommandSizeLimiter { } } - fn dyn_clone(&self) -> Box { + fn dyn_clone(&self) -> Box { Box::new(self.clone()) } } @@ -248,7 +254,7 @@ impl CommandSizeLimiter for MaxArgsCommandSizeLimiter { } } - fn dyn_clone(&self) -> Box { + fn dyn_clone(&self) -> Box { Box::new(self.clone()) } } @@ -292,21 +298,44 @@ impl CommandSizeLimiter for MaxLinesCommandSizeLimiter { } } - fn dyn_clone(&self) -> Box { + fn dyn_clone(&self) -> Box { Box::new(self.clone()) } } +#[derive(Default)] enum CommandResult { + #[default] Success, Failure, } -impl CommandResult { - fn combine(&mut self, other: Self) { - if matches!(*self, Self::Success) { - *self = other; - } +impl FromIterator for CommandResult { + fn from_iter>(iter: I) -> Self { + iter.into_iter() + .fold(None, |acc, item| { + acc.or_else(|| (!matches!(item, Self::Success)).then_some(item)) + }) + .unwrap_or_default() + } +} + +impl FromParallelIterator for CommandResult { + fn from_par_iter(par_iter: I) -> Self + where + I: IntoParallelIterator, + { + par_iter + .into_par_iter() + .fold( + || None, + |acc, item| acc.or_else(|| (!matches!(item, Self::Success)).then_some(item)), + ) + .collect_vec_list() + .into_iter() + .flatten() + .flatten() + .collect() } } @@ -486,6 +515,20 @@ impl CommandBuilder<'_> { } } } + + fn execute_parallel(self, capacity: u64) -> Result { + loop { + let current = ACTIVE_PROCS.fetch_add(1, Ordering::SeqCst); + if current < capacity { + break; + } + ACTIVE_PROCS.fetch_sub(1, Ordering::SeqCst); + std::hint::spin_loop(); + } + let result = self.execute(); + ACTIVE_PROCS.fetch_sub(1, Ordering::SeqCst); + result + } } trait ArgumentReader { @@ -642,13 +685,13 @@ where } struct EofArgumentReader { - reader: Box, + reader: Box, eof_delimiter: OsString, eof_found: bool, } impl EofArgumentReader { - fn new(reader: Box, eof_delimiter: &String) -> Self { + fn new(reader: Box, eof_delimiter: &String) -> Self { Self { reader, eof_delimiter: eof_delimiter.into(), @@ -724,6 +767,7 @@ struct InputProcessOptions { max_args: Option, max_lines: Option, no_run_if_empty: bool, + max_procs: std::num::NonZero, } impl InputProcessOptions { @@ -732,51 +776,119 @@ impl InputProcessOptions { max_args: Option, max_lines: Option, no_run_if_empty: bool, + max_procs: std::num::NonZero, ) -> Self { Self { exit_if_pass_char_limit, max_args, max_lines, no_run_if_empty, + max_procs, + } + } +} + +struct CommandBatchIter<'a> { + builder_options: &'a CommandBuilderOptions, + args: Box, + options: &'a InputProcessOptions, + current_builder: CommandBuilder<'a>, + have_pending_command: bool, + done: bool, +} + +impl<'a> CommandBatchIter<'a> { + fn new( + builder_options: &'a CommandBuilderOptions, + args: Box, + options: &'a InputProcessOptions, + ) -> Self { + Self { + current_builder: CommandBuilder::new(builder_options), + builder_options, + args, + options, + have_pending_command: false, + done: false, + } + } +} + +impl<'a> Iterator for CommandBatchIter<'a> { + type Item = Result, XargsError>; + + fn next(&mut self) -> Option { + if self.done { + return None; + } + loop { + match self.args.next() { + Err(e) => { + self.done = true; + return Some(Err(XargsError::from(e))); + } + Ok(None) => { + self.done = true; + if !self.options.no_run_if_empty || self.have_pending_command { + return Some(Ok(std::mem::replace( + &mut self.current_builder, + CommandBuilder::new(self.builder_options), + ))); + } + return None; + } + Ok(Some(arg)) => match self.current_builder.add_arg(arg) { + Ok(()) => { + self.have_pending_command = true; + } + Err(ExhaustedCommandSpace { arg, out_of_chars }) => { + if out_of_chars + && self.options.exit_if_pass_char_limit + && (self.options.max_args.is_some() || self.options.max_lines.is_some()) + { + self.done = true; + return Some(Err(XargsError::ArgumentTooLarge)); + } + let old = std::mem::replace( + &mut self.current_builder, + CommandBuilder::new(self.builder_options), + ); + let batch = self.have_pending_command.then_some(old); + if let Err(ExhaustedCommandSpace { .. }) = self.current_builder.add_arg(arg) + { + self.done = true; + return Some(Err(XargsError::ArgumentTooLarge)); + } + self.have_pending_command = true; + if let Some(ready) = batch { + return Some(Ok(ready)); + } + } + }, + } } } } fn process_input( builder_options: &CommandBuilderOptions, - mut args: Box, + args: Box, options: &InputProcessOptions, ) -> Result { - let mut current_builder = CommandBuilder::new(builder_options); - let mut have_pending_command = false; - let mut result = CommandResult::Success; - - while let Some(arg) = args.next()? { - if let Err(ExhaustedCommandSpace { arg, out_of_chars }) = current_builder.add_arg(arg) { - if out_of_chars - && options.exit_if_pass_char_limit - && (options.max_args.is_some() || options.max_lines.is_some()) - { - return Err(XargsError::ArgumentTooLarge); - } - if have_pending_command { - result.combine(current_builder.execute()?); - } + let batches = CommandBatchIter::new(builder_options, args, options); - current_builder = CommandBuilder::new(builder_options); - if let Err(ExhaustedCommandSpace { .. }) = current_builder.add_arg(arg) { - return Err(XargsError::ArgumentTooLarge); - } - } + let capacity = options.max_procs.get(); - have_pending_command = true; - } - - if !options.no_run_if_empty || have_pending_command { - result.combine(current_builder.execute()?); + if capacity == 1 { + batches + .map(|batch| batch?.execute().map_err(XargsError::from)) + .collect() + } else { + batches + .par_bridge() + .map(|batch| batch?.execute_parallel(capacity).map_err(XargsError::from)) + .collect() } - - Ok(result) } fn parse_delimiter(s: &str) -> Result { @@ -958,7 +1070,7 @@ fn do_xargs(args: &[&str]) -> Result { Arg::new(options::MAX_PROCS) .short('P') .long(options::MAX_PROCS) - .help("Run up to this many commands in parallel [NOT IMPLEMENTED]") + .help("Run up to this many commands in parallel") .value_parser(clap::value_parser!(usize)), ) .arg( @@ -1119,13 +1231,13 @@ fn do_xargs(args: &[&str]) -> Result { builder_options.verbose = options.verbose; builder_options.close_stdin = options.arg_file.is_none(); - let args_file: Box = if let Some(path) = &options.arg_file { + let args_file: Box = if let Some(path) = &options.arg_file { Box::new(fs::File::open(path).map_err(|e| format!("Failed to open {path}: {e}"))?) } else { Box::new(io::stdin()) }; - let mut args: Box = if let Some(delimiter) = options.delimiter { + let mut args: Box = if let Some(delimiter) = options.delimiter { Box::new(ByteDelimitedArgumentReader::new(args_file, delimiter)) } else { Box::new(WhitespaceDelimitedArgumentReader::new(args_file)) @@ -1135,6 +1247,14 @@ fn do_xargs(args: &[&str]) -> Result { args = Box::new(EofArgumentReader::new(args, &eof_delimiter)); } + let max_procs = + matches + .get_one::(options::MAX_PROCS) + .map_or(std::num::NonZero::::MIN, |&n| { + std::num::NonZero::new(n.try_into().expect("max-procs value exceeds u64 range")) + .expect("max-procs must be non-zero") + }); + let result = process_input( &builder_options, args, @@ -1143,6 +1263,7 @@ fn do_xargs(args: &[&str]) -> Result { options.max_args, options.max_lines, options.no_run_if_empty, + max_procs, ), )?; Ok(result) @@ -1210,7 +1331,7 @@ mod tests { }) } - fn dyn_clone(&self) -> Box { + fn dyn_clone(&self) -> Box { Box::new(self.clone()) } } @@ -1275,7 +1396,8 @@ mod tests { #[test] fn test_chars_limiter_asks_cursor() { - let mut rejects: [Box; 1] = [Box::new(AlwaysRejectLimiter)]; + let mut rejects: [Box; 1] = + [Box::new(AlwaysRejectLimiter)]; let reject_cursor = LimiterCursor { limiters: &mut rejects, }; @@ -1312,7 +1434,8 @@ mod tests { #[test] fn test_args_limiter_asks_cursor() { - let mut rejects: [Box; 1] = [Box::new(AlwaysRejectLimiter)]; + let mut rejects: [Box; 1] = + [Box::new(AlwaysRejectLimiter)]; let reject_cursor = LimiterCursor { limiters: &mut rejects, }; @@ -1358,7 +1481,8 @@ mod tests { #[test] fn test_lines_limiter_asks_cursor() { - let mut rejects: [Box; 1] = [Box::new(AlwaysRejectLimiter)]; + let mut rejects: [Box; 1] = + [Box::new(AlwaysRejectLimiter)]; let reject_cursor = LimiterCursor { limiters: &mut rejects, }; diff --git a/tests/test_xargs.rs b/tests/test_xargs.rs index ac9f63f4..c0d56a60 100644 --- a/tests/test_xargs.rs +++ b/tests/test_xargs.rs @@ -25,6 +25,24 @@ fn xargs_basics() { .stdout_only("abc def ghi i j \"k\n"); } +#[test] +fn xargs_parallel_one() { + ucmd() + .args(&["-P1"]) + .pipe_in("abc\ndef g\\hi 'i j \"k'") + .succeeds() + .stdout_only("abc def ghi i j \"k\n"); +} + +#[test] +fn xargs_parallel_many() { + ucmd() + .args(&["-P3"]) + .pipe_in("abc\ndef g\\hi 'i j \"k'") + .succeeds() + .stdout_only("abc def ghi i j \"k\n"); +} + #[test] fn xargs_null() { ucmd() @@ -296,6 +314,15 @@ fn xargs_exec_with_signal() { ); } +#[test] +fn xargs_exec_negative_parallel() { + ucmd() + .args(&["-P=-1"]) + .fails_with_code(1) + .stderr_contains("Error:") + .no_stdout(); +} + #[test] fn xargs_exec_not_found() { ucmd()