optimize latency window stats (#10)

This commit is contained in:
Sijie.Sun 2024-02-01 18:32:06 +08:00 committed by GitHub
parent 95a52a4b5c
commit aeb00fb42f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -1,9 +1,12 @@
use std::sync::atomic::{AtomicU32, AtomicU64}; use std::sync::atomic::{AtomicU32, AtomicU64, Ordering::Relaxed};
pub struct WindowLatency { pub struct WindowLatency {
latency_us_window: Vec<AtomicU32>, latency_us_window: Vec<AtomicU32>,
latency_us_window_index: AtomicU32, latency_us_window_index: AtomicU32,
latency_us_window_size: AtomicU32, latency_us_window_size: u32,
sum: AtomicU32,
count: AtomicU32,
} }
impl WindowLatency { impl WindowLatency {
@ -11,39 +14,32 @@ impl WindowLatency {
Self { Self {
latency_us_window: (0..window_size).map(|_| AtomicU32::new(0)).collect(), latency_us_window: (0..window_size).map(|_| AtomicU32::new(0)).collect(),
latency_us_window_index: AtomicU32::new(0), latency_us_window_index: AtomicU32::new(0),
latency_us_window_size: AtomicU32::new(window_size), latency_us_window_size: window_size,
sum: AtomicU32::new(0),
count: AtomicU32::new(0),
} }
} }
pub fn record_latency(&self, latency_us: u32) { pub fn record_latency(&self, latency_us: u32) {
let index = self let index = self.latency_us_window_index.fetch_add(1, Relaxed);
.latency_us_window_index if index < self.latency_us_window_size {
.fetch_add(1, std::sync::atomic::Ordering::Relaxed); self.count.fetch_add(1, Relaxed);
let index = index }
% self
.latency_us_window_size let index = index % self.latency_us_window_size;
.load(std::sync::atomic::Ordering::Relaxed); let old_lat = self.latency_us_window[index as usize].swap(latency_us, Relaxed);
self.latency_us_window[index as usize]
.store(latency_us, std::sync::atomic::Ordering::Relaxed); if old_lat < latency_us {
self.sum.fetch_add(latency_us - old_lat, Relaxed);
} else {
self.sum.fetch_sub(old_lat - latency_us, Relaxed);
}
} }
pub fn get_latency_us<T: From<u32> + std::ops::Div<Output = T>>(&self) -> T { pub fn get_latency_us<T: From<u32> + std::ops::Div<Output = T>>(&self) -> T {
let window_size = self let count = self.count.load(Relaxed);
.latency_us_window_size let sum = self.sum.load(Relaxed);
.load(std::sync::atomic::Ordering::Relaxed);
let mut sum = 0;
let mut count = 0;
for i in 0..window_size {
if i >= self
.latency_us_window_index
.load(std::sync::atomic::Ordering::Relaxed)
{
break;
}
sum += self.latency_us_window[i as usize].load(std::sync::atomic::Ordering::Relaxed);
count += 1;
}
if count == 0 { if count == 0 {
0.into() 0.into()
} else { } else {
@ -72,32 +68,28 @@ impl Throughput {
} }
pub fn tx_bytes(&self) -> u64 { pub fn tx_bytes(&self) -> u64 {
self.tx_bytes.load(std::sync::atomic::Ordering::Relaxed) self.tx_bytes.load(Relaxed)
} }
pub fn rx_bytes(&self) -> u64 { pub fn rx_bytes(&self) -> u64 {
self.rx_bytes.load(std::sync::atomic::Ordering::Relaxed) self.rx_bytes.load(Relaxed)
} }
pub fn tx_packets(&self) -> u64 { pub fn tx_packets(&self) -> u64 {
self.tx_packets.load(std::sync::atomic::Ordering::Relaxed) self.tx_packets.load(Relaxed)
} }
pub fn rx_packets(&self) -> u64 { pub fn rx_packets(&self) -> u64 {
self.rx_packets.load(std::sync::atomic::Ordering::Relaxed) self.rx_packets.load(Relaxed)
} }
pub fn record_tx_bytes(&self, bytes: u64) { pub fn record_tx_bytes(&self, bytes: u64) {
self.tx_bytes self.tx_bytes.fetch_add(bytes, Relaxed);
.fetch_add(bytes, std::sync::atomic::Ordering::Relaxed); self.tx_packets.fetch_add(1, Relaxed);
self.tx_packets
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
} }
pub fn record_rx_bytes(&self, bytes: u64) { pub fn record_rx_bytes(&self, bytes: u64) {
self.rx_bytes self.rx_bytes.fetch_add(bytes, Relaxed);
.fetch_add(bytes, std::sync::atomic::Ordering::Relaxed); self.rx_packets.fetch_add(1, Relaxed);
self.rx_packets
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
} }
} }