diff --git a/src/worker/script.rs b/src/worker/script.rs index bcd1ac8..18463a6 100644 --- a/src/worker/script.rs +++ b/src/worker/script.rs @@ -8,6 +8,7 @@ use std::{ io::prelude::*, net::{Shutdown, TcpStream}, process::Command, + sync::Arc, thread, time, }; @@ -22,6 +23,7 @@ use llvm::execution_engine::*; use llvm::target::*; use llvm_sys::LLVMType; use llvm_sys::prelude::*; +use std::cell::RefCell; use std::ffi::{CStr, CString, c_void}; use std::mem; @@ -129,6 +131,11 @@ pub unsafe extern "C" fn task(name: *const i8, random: bool) -> u64 { 0 } +thread_local! { + static LAST_RANDOM_PATH: RefCell = + RefCell::new(CString::new("").unwrap()); +} + /// Return a randomly generated path. /// /// # Safety @@ -144,7 +151,10 @@ pub unsafe extern "C" fn random_path(base: *const i8) -> *const i8 { .map(char::from) .collect(); - CString::new(format!("{base}/{uniq}")).unwrap().into_raw() + LAST_RANDOM_PATH.with(|last| { + *last.borrow_mut() = CString::new(format!("{base}/{uniq}")).unwrap(); + last.borrow().as_ptr() + }) } #[derive(Debug, Clone)] @@ -506,21 +516,43 @@ impl Worker for ScriptWorker { debug!("Distribution {:?}", d); let Dist::Exp { rate } = d else { todo!() }; - loop { - let worker = self.clone(); - thread::spawn(move || { - (worker.jit)(); - }); - - let interval: f64 = - thread_rng().sample(Exp::new(*rate).unwrap()); - debug!("Interval {}", interval); - thread::sleep(time::Duration::from_secs_f64(interval)); - } + const MAX_CONCURRENT: usize = 16; + let semaphore = Arc::new(( + std::sync::Mutex::new(0usize), + std::sync::Condvar::new(), + )); + + thread::scope(|s| { + loop { + { + let (lock, cvar) = &*semaphore; + let mut count = cvar + .wait_while(lock.lock().unwrap(), |c| { + *c >= MAX_CONCURRENT + }) + .unwrap(); + *count += 1; + } + + let worker = self.clone(); + let sem = Arc::clone(&semaphore); + s.spawn(move || { + (worker.jit)(); + let (lock, cvar) = &*sem; + *lock.lock().unwrap() -= 1; + cvar.notify_one(); + }); + + let interval: f64 = + thread_rng().sample(Exp::new(*rate).unwrap()); + debug!("Interval {}", interval); + thread::sleep(time::Duration::from_secs_f64(interval)); + } + }); } None => { debug!("Single unit"); - (self.jit)() + (self.jit)(); } };