1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
use crate::runtime::{Config, MetricsBatch, WorkerMetrics};

use std::cmp;
use std::time::{Duration, Instant};

/// Per-worker statistics. This is used for both tuning the scheduler and
/// reporting runtime-level metrics/stats.
pub(crate) struct Stats {
    /// The metrics batch used to report runtime-level metrics/stats to the
    /// user.
    batch: MetricsBatch,

    /// Exponentially-weighted moving average of time spent polling scheduled a
    /// task.
    ///
    /// Tracked in nanoseconds, stored as a `f64` since that is what we use with
    /// the EWMA calculations
    task_poll_time_ewma: f64,
}

/// Transient state
pub(crate) struct Ephemeral {
    /// Instant at which work last resumed (continued after park).
    ///
    /// This duplicates the value stored in `MetricsBatch`. We will unify
    /// `Stats` and `MetricsBatch` when we stabilize metrics.
    processing_scheduled_tasks_started_at: Instant,

    /// Number of tasks polled in the batch of scheduled tasks
    tasks_polled_in_batch: usize,

    /// Used to ensure calls to start / stop batch are paired
    #[cfg(debug_assertions)]
    batch_started: bool,
}

impl Ephemeral {
    pub(crate) fn new() -> Ephemeral {
        Ephemeral {
            processing_scheduled_tasks_started_at: Instant::now(),
            tasks_polled_in_batch: 0,
            #[cfg(debug_assertions)]
            batch_started: false,
        }
    }
}

/// How to weigh each individual poll time, value is plucked from thin air.
const TASK_POLL_TIME_EWMA_ALPHA: f64 = 0.1;

/// Ideally, we wouldn't go above this, value is plucked from thin air.
const TARGET_GLOBAL_QUEUE_INTERVAL: f64 = Duration::from_micros(200).as_nanos() as f64;

/// Max value for the global queue interval. This is 2x the previous default
const MAX_TASKS_POLLED_PER_GLOBAL_QUEUE_INTERVAL: u32 = 127;

/// This is the previous default
const TARGET_TASKS_POLLED_PER_GLOBAL_QUEUE_INTERVAL: u32 = 61;

impl Stats {
    pub(crate) const DEFAULT_GLOBAL_QUEUE_INTERVAL: u32 =
        TARGET_TASKS_POLLED_PER_GLOBAL_QUEUE_INTERVAL;

    pub(crate) fn new(worker_metrics: &WorkerMetrics) -> Stats {
        // Seed the value with what we hope to see.
        let task_poll_time_ewma =
            TARGET_GLOBAL_QUEUE_INTERVAL / TARGET_TASKS_POLLED_PER_GLOBAL_QUEUE_INTERVAL as f64;

        Stats {
            batch: MetricsBatch::new(worker_metrics),
            task_poll_time_ewma,
        }
    }

    pub(crate) fn tuned_global_queue_interval(&self, config: &Config) -> u32 {
        // If an interval is explicitly set, don't tune.
        if let Some(configured) = config.global_queue_interval {
            return configured;
        }

        // As of Rust 1.45, casts from f64 -> u32 are saturating, which is fine here.
        let tasks_per_interval = (TARGET_GLOBAL_QUEUE_INTERVAL / self.task_poll_time_ewma) as u32;

        cmp::max(
            // If we are using self-tuning, we don't want to return less than 2 as that would result in the
            // global queue always getting checked first.
            2,
            cmp::min(
                MAX_TASKS_POLLED_PER_GLOBAL_QUEUE_INTERVAL,
                tasks_per_interval,
            ),
        )
    }

    pub(crate) fn submit(&mut self, to: &WorkerMetrics) {
        self.batch.submit(to, self.task_poll_time_ewma as u64);
    }

    pub(crate) fn about_to_park(&mut self) {
        self.batch.about_to_park();
    }

    pub(crate) fn unparked(&mut self) {
        self.batch.unparked();
    }

    pub(crate) fn inc_local_schedule_count(&mut self) {
        self.batch.inc_local_schedule_count();
    }

    pub(crate) fn start_processing_scheduled_tasks(&mut self, ephemeral: &mut Ephemeral) {
        self.batch.start_processing_scheduled_tasks();

        #[cfg(debug_assertions)]
        {
            debug_assert!(!ephemeral.batch_started);
            ephemeral.batch_started = true;
        }

        ephemeral.processing_scheduled_tasks_started_at = Instant::now();
        ephemeral.tasks_polled_in_batch = 0;
    }

    pub(crate) fn end_processing_scheduled_tasks(&mut self, ephemeral: &mut Ephemeral) {
        self.batch.end_processing_scheduled_tasks();

        #[cfg(debug_assertions)]
        {
            debug_assert!(ephemeral.batch_started);
            ephemeral.batch_started = false;
        }

        // Update the EWMA task poll time
        if ephemeral.tasks_polled_in_batch > 0 {
            let now = Instant::now();

            // If we "overflow" this conversion, we have bigger problems than
            // slightly off stats.
            let elapsed = (now - ephemeral.processing_scheduled_tasks_started_at).as_nanos() as f64;
            let num_polls = ephemeral.tasks_polled_in_batch as f64;

            // Calculate the mean poll duration for a single task in the batch
            let mean_poll_duration = elapsed / num_polls;

            // Compute the alpha weighted by the number of tasks polled this batch.
            let weighted_alpha = 1.0 - (1.0 - TASK_POLL_TIME_EWMA_ALPHA).powf(num_polls);

            // Now compute the new weighted average task poll time.
            self.task_poll_time_ewma = weighted_alpha * mean_poll_duration
                + (1.0 - weighted_alpha) * self.task_poll_time_ewma;
        }
    }

    pub(crate) fn start_poll(&mut self, ephemeral: &mut Ephemeral) {
        self.batch.start_poll();

        ephemeral.tasks_polled_in_batch += 1;
    }

    pub(crate) fn end_poll(&mut self) {
        self.batch.end_poll();
    }

    pub(crate) fn incr_steal_count(&mut self, by: u16) {
        self.batch.incr_steal_count(by);
    }

    pub(crate) fn incr_steal_operations(&mut self) {
        self.batch.incr_steal_operations();
    }

    pub(crate) fn incr_overflow_count(&mut self) {
        self.batch.incr_overflow_count();
    }
}