#![allow(dead_code)]
use crate::loom::sync::atomic::{AtomicBool, AtomicUsize};
use crate::loom::sync::MutexGuard;
use crate::runtime::scheduler::multi_thread_alt::{worker, Core, Handle, Shared};
use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
pub(super) struct Idle {
num_searching: AtomicUsize,
num_idle: AtomicUsize,
idle_map: IdleMap,
needs_searching: AtomicBool,
num_cores: usize,
}
pub(super) struct IdleMap {
chunks: Vec<AtomicUsize>,
}
pub(super) struct Snapshot {
chunks: Vec<usize>,
}
pub(super) struct Synced {
sleepers: Vec<usize>,
available_cores: Vec<Box<Core>>,
}
impl Idle {
pub(super) fn new(cores: Vec<Box<Core>>, num_workers: usize) -> (Idle, Synced) {
let idle = Idle {
num_searching: AtomicUsize::new(0),
num_idle: AtomicUsize::new(cores.len()),
idle_map: IdleMap::new(&cores),
needs_searching: AtomicBool::new(false),
num_cores: cores.len(),
};
let synced = Synced {
sleepers: Vec::with_capacity(num_workers),
available_cores: cores,
};
(idle, synced)
}
pub(super) fn needs_searching(&self) -> bool {
self.needs_searching.load(Acquire)
}
pub(super) fn num_idle(&self, synced: &Synced) -> usize {
#[cfg(not(loom))]
debug_assert_eq!(synced.available_cores.len(), self.num_idle.load(Acquire));
synced.available_cores.len()
}
pub(super) fn num_searching(&self) -> usize {
self.num_searching.load(Acquire)
}
pub(super) fn snapshot(&self, snapshot: &mut Snapshot) {
snapshot.update(&self.idle_map)
}
pub(super) fn try_acquire_available_core(&self, synced: &mut Synced) -> Option<Box<Core>> {
let ret = synced.available_cores.pop();
if let Some(core) = &ret {
let num_idle = self.num_idle.load(Acquire) - 1;
debug_assert_eq!(num_idle, synced.available_cores.len());
self.num_idle.store(num_idle, Release);
self.idle_map.unset(core.index);
debug_assert!(self.idle_map.matches(&synced.available_cores));
}
ret
}
pub(super) fn notify_local(&self, shared: &Shared) {
if self.num_searching.load(Acquire) != 0 {
return;
}
if self.num_idle.load(Acquire) == 0 {
self.needs_searching.store(true, Release);
return;
}
if self
.num_searching
.compare_exchange(0, 1, AcqRel, Acquire)
.is_err()
{
return;
}
super::counters::inc_num_unparks_local();
let synced = shared.synced.lock();
self.notify_synced(synced, shared);
}
pub(super) fn notify_remote(&self, synced: MutexGuard<'_, worker::Synced>, shared: &Shared) {
if synced.idle.sleepers.is_empty() {
self.needs_searching.store(true, Release);
return;
}
self.num_searching.fetch_add(1, AcqRel);
self.notify_synced(synced, shared);
}
fn notify_synced(&self, mut synced: MutexGuard<'_, worker::Synced>, shared: &Shared) {
if let Some(worker) = synced.idle.sleepers.pop() {
if let Some(mut core) = self.try_acquire_available_core(&mut synced.idle) {
debug_assert!(!core.is_searching);
core.is_searching = true;
synced.assigned_cores[worker] = Some(core);
drop(synced);
super::counters::inc_num_unparks_remote();
shared.condvars[worker].notify_one();
return;
} else {
synced.idle.sleepers.push(worker);
}
}
super::counters::inc_notify_no_core();
self.needs_searching.store(true, Release);
self.num_searching.fetch_sub(1, Release);
drop(synced);
}
pub(super) fn notify_mult(
&self,
synced: &mut worker::Synced,
workers: &mut Vec<usize>,
num: usize,
) {
debug_assert!(workers.is_empty());
for _ in 0..num {
if let Some(worker) = synced.idle.sleepers.pop() {
if let Some(core) = synced.idle.available_cores.pop() {
debug_assert!(!core.is_searching);
self.idle_map.unset(core.index);
synced.assigned_cores[worker] = Some(core);
workers.push(worker);
continue;
} else {
synced.idle.sleepers.push(worker);
}
}
break;
}
if !workers.is_empty() {
debug_assert!(self.idle_map.matches(&synced.idle.available_cores));
let num_idle = synced.idle.available_cores.len();
self.num_idle.store(num_idle, Release);
} else {
#[cfg(not(loom))]
debug_assert_eq!(
synced.idle.available_cores.len(),
self.num_idle.load(Acquire)
);
self.needs_searching.store(true, Release);
}
}
pub(super) fn shutdown(&self, synced: &mut worker::Synced, shared: &Shared) {
while !synced.idle.sleepers.is_empty() && !synced.idle.available_cores.is_empty() {
let worker = synced.idle.sleepers.pop().unwrap();
let core = self.try_acquire_available_core(&mut synced.idle).unwrap();
synced.assigned_cores[worker] = Some(core);
shared.condvars[worker].notify_one();
}
debug_assert!(self.idle_map.matches(&synced.idle.available_cores));
while let Some(index) = synced.idle.sleepers.pop() {
shared.condvars[index].notify_one();
}
}
pub(super) fn shutdown_unassigned_cores(&self, handle: &Handle, shared: &Shared) {
while let Some(core) = {
let mut synced = shared.synced.lock();
self.try_acquire_available_core(&mut synced.idle)
} {
shared.shutdown_core(handle, core);
}
}
pub(super) fn release_core(&self, synced: &mut worker::Synced, core: Box<Core>) {
debug_assert!(!core.is_searching);
debug_assert!(synced.inject.is_empty());
let num_idle = synced.idle.available_cores.len();
#[cfg(not(loom))]
debug_assert_eq!(num_idle, self.num_idle.load(Acquire));
self.idle_map.set(core.index);
synced.idle.available_cores.push(core);
debug_assert!(self.idle_map.matches(&synced.idle.available_cores));
self.num_idle.store(num_idle + 1, Release);
}
pub(super) fn transition_worker_to_parked(&self, synced: &mut worker::Synced, index: usize) {
synced.idle.sleepers.push(index);
debug_assert!(synced.assigned_cores[index].is_none());
}
pub(super) fn try_transition_worker_to_searching(&self, core: &mut Core) {
debug_assert!(!core.is_searching);
let num_searching = self.num_searching.load(Acquire);
let num_idle = self.num_idle.load(Acquire);
if 2 * num_searching >= self.num_cores - num_idle {
return;
}
self.transition_worker_to_searching(core);
}
pub(super) fn transition_worker_to_searching_if_needed(
&self,
_synced: &mut Synced,
core: &mut Core,
) -> bool {
if self.needs_searching.load(Acquire) {
self.transition_worker_to_searching(core);
true
} else {
false
}
}
pub(super) fn transition_worker_to_searching(&self, core: &mut Core) {
core.is_searching = true;
self.num_searching.fetch_add(1, AcqRel);
self.needs_searching.store(false, Release);
}
pub(super) fn transition_worker_from_searching(&self) -> bool {
let prev = self.num_searching.fetch_sub(1, AcqRel);
debug_assert!(prev > 0);
prev == 1
}
}
const BITS: usize = usize::BITS as usize;
const BIT_MASK: usize = (usize::BITS - 1) as usize;
impl IdleMap {
fn new(cores: &[Box<Core>]) -> IdleMap {
let ret = IdleMap::new_n(num_chunks(cores.len()));
ret.set_all(cores);
ret
}
fn new_n(n: usize) -> IdleMap {
let chunks = (0..n).map(|_| AtomicUsize::new(0)).collect();
IdleMap { chunks }
}
fn set(&self, index: usize) {
let (chunk, mask) = index_to_mask(index);
let prev = self.chunks[chunk].load(Acquire);
let next = prev | mask;
self.chunks[chunk].store(next, Release);
}
fn set_all(&self, cores: &[Box<Core>]) {
for core in cores {
self.set(core.index);
}
}
fn unset(&self, index: usize) {
let (chunk, mask) = index_to_mask(index);
let prev = self.chunks[chunk].load(Acquire);
let next = prev & !mask;
self.chunks[chunk].store(next, Release);
}
fn matches(&self, idle_cores: &[Box<Core>]) -> bool {
let expect = IdleMap::new_n(self.chunks.len());
expect.set_all(idle_cores);
for (i, chunk) in expect.chunks.iter().enumerate() {
if chunk.load(Acquire) != self.chunks[i].load(Acquire) {
return false;
}
}
true
}
}
impl Snapshot {
pub(crate) fn new(idle: &Idle) -> Snapshot {
let chunks = vec![0; idle.idle_map.chunks.len()];
let mut ret = Snapshot { chunks };
ret.update(&idle.idle_map);
ret
}
fn update(&mut self, idle_map: &IdleMap) {
for i in 0..self.chunks.len() {
self.chunks[i] = idle_map.chunks[i].load(Acquire);
}
}
pub(super) fn is_idle(&self, index: usize) -> bool {
let (chunk, mask) = index_to_mask(index);
debug_assert!(
chunk < self.chunks.len(),
"index={}; chunks={}",
index,
self.chunks.len()
);
self.chunks[chunk] & mask == mask
}
}
fn num_chunks(max_cores: usize) -> usize {
(max_cores / BITS) + 1
}
fn index_to_mask(index: usize) -> (usize, usize) {
let mask = 1 << (index & BIT_MASK);
let chunk = index / BITS;
(chunk, mask)
}
fn num_active_workers(synced: &Synced) -> usize {
synced.available_cores.capacity() - synced.available_cores.len()
}