khora_telemetry/storage/
memory_backend.rs

1// Copyright 2025 eraflo
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! In-memory metrics storage backend.
16
17use crate::storage::backend::{BackendStats, MetricsBackend};
18use khora_core::telemetry::metrics::{Metric, MetricId, MetricType, MetricsError, MetricsResult};
19use std::collections::HashMap;
20use std::sync::RwLock;
21
22/// High-performance in-memory metrics backend using RwLock<HashMap>
23///
24/// This implementation provides:
25/// - Thread-safe concurrent access (multiple readers, single writer)
26/// - O(1) average case lookup and insertion
27/// - Memory-efficient storage
28/// - Lock-free reads when possible
29#[derive(Debug)]
30pub struct InMemoryBackend {
31    /// The core storage - RwLock allows concurrent reads
32    storage: RwLock<HashMap<MetricId, Metric>>,
33}
34
35impl InMemoryBackend {
36    /// Create a new in-memory backend
37    pub fn new() -> Self {
38        Self {
39            storage: RwLock::new(HashMap::new()),
40        }
41    }
42
43    /// Create a new in-memory backend with initial capacity
44    pub fn with_capacity(capacity: usize) -> Self {
45        Self {
46            storage: RwLock::new(HashMap::with_capacity(capacity)),
47        }
48    }
49
50    /// Get statistics about this backend
51    pub fn get_stats(&self) -> BackendStats {
52        let storage = self.storage.read().unwrap();
53
54        let mut counter_count = 0;
55        let mut gauge_count = 0;
56        let mut histogram_count = 0;
57
58        for metric in storage.values() {
59            match metric.value.metric_type() {
60                MetricType::Counter => counter_count += 1,
61                MetricType::Gauge => gauge_count += 1,
62                MetricType::Histogram => histogram_count += 1,
63            }
64        }
65
66        // Rough estimate of memory usage
67        let estimated_memory_bytes = storage.len() * std::mem::size_of::<(MetricId, Metric)>()
68            + storage.capacity() * std::mem::size_of::<(MetricId, Metric)>();
69
70        BackendStats {
71            total_metrics: storage.len(),
72            counter_count,
73            gauge_count,
74            histogram_count,
75            estimated_memory_bytes,
76        }
77    }
78
79    /// Get metrics by namespace
80    pub fn get_metrics_by_namespace(&self, namespace: &str) -> Vec<Metric> {
81        let storage = self.storage.read().unwrap();
82        storage
83            .values()
84            .filter(|metric| metric.metadata.id.namespace == namespace)
85            .cloned()
86            .collect()
87    }
88
89    /// Get metrics by type
90    pub fn get_metrics_by_type(&self, metric_type: MetricType) -> Vec<Metric> {
91        let storage = self.storage.read().unwrap();
92        storage
93            .values()
94            .filter(|metric| metric.metadata.metric_type == metric_type)
95            .cloned()
96            .collect()
97    }
98
99    /// Bulk insert metrics (more efficient than individual puts)
100    pub fn put_metrics(&self, metrics: Vec<Metric>) -> MetricsResult<()> {
101        let mut storage = self
102            .storage
103            .write()
104            .map_err(|_| MetricsError::StorageError("Failed to acquire write lock".to_string()))?;
105
106        for metric in metrics {
107            storage.insert(metric.metadata.id.clone(), metric);
108        }
109
110        Ok(())
111    }
112
113    /// Remove metrics by namespace
114    pub fn remove_metrics_by_namespace(&self, namespace: &str) -> MetricsResult<usize> {
115        let mut storage = self
116            .storage
117            .write()
118            .map_err(|_| MetricsError::StorageError("Failed to acquire write lock".to_string()))?;
119
120        let to_remove: Vec<_> = storage
121            .keys()
122            .filter(|id| id.namespace == namespace)
123            .cloned()
124            .collect();
125
126        let removed_count = to_remove.len();
127        for id in to_remove {
128            storage.remove(&id);
129        }
130
131        Ok(removed_count)
132    }
133}
134
135impl Default for InMemoryBackend {
136    fn default() -> Self {
137        Self::new()
138    }
139}
140
141impl MetricsBackend for InMemoryBackend {
142    fn as_any(&self) -> &dyn std::any::Any {
143        self
144    }
145
146    fn put_metric(&self, metric: Metric) -> MetricsResult<()> {
147        let mut storage = self
148            .storage
149            .write()
150            .map_err(|_| MetricsError::StorageError("Failed to acquire write lock".to_string()))?;
151
152        storage.insert(metric.metadata.id.clone(), metric);
153        Ok(())
154    }
155
156    fn get_metric(&self, id: &MetricId) -> MetricsResult<Metric> {
157        let storage = self
158            .storage
159            .read()
160            .map_err(|_| MetricsError::StorageError("Failed to acquire read lock".to_string()))?;
161
162        storage
163            .get(id)
164            .cloned()
165            .ok_or_else(|| MetricsError::MetricNotFound(id.clone()))
166    }
167
168    fn contains_metric(&self, id: &MetricId) -> bool {
169        if let Ok(storage) = self.storage.read() {
170            storage.contains_key(id)
171        } else {
172            false
173        }
174    }
175
176    fn remove_metric(&self, id: &MetricId) -> MetricsResult<()> {
177        let mut storage = self
178            .storage
179            .write()
180            .map_err(|_| MetricsError::StorageError("Failed to acquire write lock".to_string()))?;
181
182        storage
183            .remove(id)
184            .map(|_| ())
185            .ok_or_else(|| MetricsError::MetricNotFound(id.clone()))
186    }
187
188    fn list_metric_ids(&self) -> Vec<MetricId> {
189        if let Ok(storage) = self.storage.read() {
190            storage.keys().cloned().collect()
191        } else {
192            Vec::new()
193        }
194    }
195
196    fn list_all_metrics(&self) -> Vec<Metric> {
197        if let Ok(storage) = self.storage.read() {
198            storage.values().cloned().collect()
199        } else {
200            Vec::new()
201        }
202    }
203
204    fn clear_all(&self) -> MetricsResult<()> {
205        let mut storage = self
206            .storage
207            .write()
208            .map_err(|_| MetricsError::StorageError("Failed to acquire write lock".to_string()))?;
209
210        storage.clear();
211        Ok(())
212    }
213
214    fn metric_count(&self) -> usize {
215        if let Ok(storage) = self.storage.read() {
216            storage.len()
217        } else {
218            0
219        }
220    }
221}
222
223#[cfg(test)]
224mod tests {
225    use super::*;
226    use khora_core::telemetry::metrics::{Metric, MetricId, MetricValue};
227
228    #[test]
229    fn test_in_memory_backend_basic_operations() {
230        let backend = InMemoryBackend::new();
231        let id = MetricId::new("test", "counter");
232        let metric = Metric::new_counter(id.clone(), "Test counter", 42);
233
234        // Test put and get
235        assert!(backend.put_metric(metric.clone()).is_ok());
236        assert!(backend.contains_metric(&id));
237
238        let retrieved = backend.get_metric(&id).unwrap();
239        assert_eq!(retrieved.value.as_counter(), Some(42));
240        assert_eq!(backend.metric_count(), 1);
241
242        // Test remove
243        assert!(backend.remove_metric(&id).is_ok());
244        assert!(!backend.contains_metric(&id));
245        assert_eq!(backend.metric_count(), 0);
246    }
247
248    #[test]
249    fn test_counter_increment() {
250        let backend = InMemoryBackend::new();
251        let id = MetricId::new("test", "counter");
252        let metric = Metric::new_counter(id.clone(), "Test counter", 0);
253
254        backend.put_metric(metric).unwrap();
255
256        // Test increment
257        let new_value = backend.increment_counter(&id, 5).unwrap();
258        assert_eq!(new_value, 5);
259
260        let new_value = backend.increment_counter(&id, 3).unwrap();
261        assert_eq!(new_value, 8);
262
263        // Verify the stored value
264        let retrieved = backend.get_metric(&id).unwrap();
265        assert_eq!(retrieved.value.as_counter(), Some(8));
266    }
267
268    #[test]
269    fn test_gauge_operations() {
270        let backend = InMemoryBackend::new();
271        let id = MetricId::new("test", "gauge");
272        let metric = Metric::new_gauge(id.clone(), "Test gauge", "bytes", 100.0);
273
274        backend.put_metric(metric).unwrap();
275
276        // Test set gauge
277        backend.set_gauge(&id, 250.5).unwrap();
278
279        let retrieved = backend.get_metric(&id).unwrap();
280        assert_eq!(retrieved.value.as_gauge(), Some(250.5));
281    }
282
283    #[test]
284    fn test_histogram_operations() {
285        let backend = InMemoryBackend::new();
286        let id = MetricId::new("test", "histogram");
287        let buckets = vec![1.0, 5.0, 10.0, 50.0, 100.0];
288        let metric = Metric::new_histogram(id.clone(), "Test histogram", "ms", buckets);
289
290        backend.put_metric(metric).unwrap();
291
292        // Add some samples
293        backend.record_histogram_sample(&id, 0.5).unwrap(); // <= 1.0, 5.0, 10.0, 50.0, 100.0
294        backend.record_histogram_sample(&id, 3.0).unwrap(); // <= 5.0, 10.0, 50.0, 100.0 (not <= 1.0)
295        backend.record_histogram_sample(&id, 7.0).unwrap(); // <= 10.0, 50.0, 100.0 (not <= 1.0, 5.0)
296        backend.record_histogram_sample(&id, 25.0).unwrap(); // <= 50.0, 100.0 (not <= 1.0, 5.0, 10.0)
297
298        let retrieved = backend.get_metric(&id).unwrap();
299        if let MetricValue::Histogram {
300            samples,
301            bucket_counts,
302            ..
303        } = retrieved.value
304        {
305            assert_eq!(samples.len(), 4);
306            // Cumulative buckets: each bucket counts all samples <= its bound
307            assert_eq!(bucket_counts[0], 1); // 0.5 <= 1.0
308            assert_eq!(bucket_counts[1], 2); // 0.5, 3.0 <= 5.0
309            assert_eq!(bucket_counts[2], 3); // 0.5, 3.0, 7.0 <= 10.0
310            assert_eq!(bucket_counts[3], 4); // 0.5, 3.0, 7.0, 25.0 <= 50.0
311            assert_eq!(bucket_counts[4], 4); // all samples <= 100.0
312        }
313    }
314
315    #[test]
316    fn test_bulk_operations() {
317        let backend = InMemoryBackend::new();
318
319        let metrics = vec![
320            Metric::new_counter(MetricId::new("engine", "frame_count"), "Frame counter", 100),
321            Metric::new_gauge(
322                MetricId::new("memory", "heap_mb"),
323                "Heap usage",
324                "MB",
325                512.0,
326            ),
327            Metric::new_counter(
328                MetricId::new("renderer", "triangles"),
329                "Triangle counter",
330                50000,
331            ),
332        ];
333
334        backend.put_metrics(metrics).unwrap();
335        assert_eq!(backend.metric_count(), 3);
336
337        // Test namespace filtering
338        let engine_metrics = backend.get_metrics_by_namespace("engine");
339        assert_eq!(engine_metrics.len(), 1);
340        assert_eq!(engine_metrics[0].metadata.id.name, "frame_count");
341
342        // Test type filtering
343        let counters = backend.get_metrics_by_type(MetricType::Counter);
344        assert_eq!(counters.len(), 2);
345
346        // Test namespace removal
347        let removed = backend.remove_metrics_by_namespace("engine").unwrap();
348        assert_eq!(removed, 1);
349        assert_eq!(backend.metric_count(), 2);
350    }
351
352    #[test]
353    fn test_backend_stats() {
354        let backend = InMemoryBackend::new();
355
356        backend
357            .put_metric(Metric::new_counter(
358                MetricId::new("test", "c1"),
359                "Counter 1",
360                0,
361            ))
362            .unwrap();
363        backend
364            .put_metric(Metric::new_counter(
365                MetricId::new("test", "c2"),
366                "Counter 2",
367                0,
368            ))
369            .unwrap();
370        backend
371            .put_metric(Metric::new_gauge(
372                MetricId::new("test", "g1"),
373                "Gauge 1",
374                "unit",
375                0.0,
376            ))
377            .unwrap();
378
379        let stats = backend.get_stats();
380        assert_eq!(stats.total_metrics, 3);
381        assert_eq!(stats.counter_count, 2);
382        assert_eq!(stats.gauge_count, 1);
383        assert_eq!(stats.histogram_count, 0);
384        assert!(stats.estimated_memory_bytes > 0);
385    }
386
387    #[test]
388    fn test_type_mismatch_errors() {
389        let backend = InMemoryBackend::new();
390        let id = MetricId::new("test", "gauge");
391        let metric = Metric::new_gauge(id.clone(), "Test gauge", "bytes", 100.0);
392
393        backend.put_metric(metric).unwrap();
394
395        // Try to increment a gauge as if it were a counter
396        let result = backend.increment_counter(&id, 5);
397        assert!(result.is_err());
398        if let Err(MetricsError::TypeMismatch { expected, found }) = result {
399            assert_eq!(expected, MetricType::Counter);
400            assert_eq!(found, MetricType::Gauge);
401        }
402    }
403
404    #[test]
405    fn test_not_found_errors() {
406        let backend = InMemoryBackend::new();
407        let id = MetricId::new("test", "nonexistent");
408
409        let result = backend.get_metric(&id);
410        assert!(result.is_err());
411        if let Err(MetricsError::MetricNotFound(missing_id)) = result {
412            assert_eq!(missing_id, id);
413        }
414    }
415}