khora_control/
service.rs

1// Copyright 2025 eraflo
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Central service for the Dynamic Context Core.
16
17use crate::context::{Context, ExecutionPhase};
18use crate::metrics::MetricStore;
19use crossbeam_channel::{Receiver, Sender};
20use khora_core::agent::Agent;
21use khora_core::telemetry::TelemetryEvent;
22use std::sync::atomic::{AtomicBool, Ordering};
23use std::sync::Arc;
24use std::thread;
25use std::time::{Duration, Instant};
26
27use crate::analysis::HeuristicEngine;
28use crate::gorna::GornaArbitrator;
29use crate::registry::AgentRegistry;
30use khora_core::control::gorna::AgentId;
31use std::sync::Mutex;
32
33/// Configuration for the DCC Service.
34#[derive(Debug, Clone)]
35pub struct DccConfig {
36    /// Frequency of the analysis loop in Hz.
37    pub tick_rate: u32,
38    /// Maximum number of telemetry events to buffer.
39    /// If the buffer is full, new events are dropped.
40    pub telemetry_buffer_size: usize,
41    /// Timeout for acquiring locks on agents during negotiation.
42    /// If an agent lock cannot be acquired within this time, the agent is skipped.
43    pub agent_lock_timeout_ms: u64,
44}
45
46impl Default for DccConfig {
47    fn default() -> Self {
48        Self {
49            tick_rate: 20,
50            telemetry_buffer_size: 1000,
51            agent_lock_timeout_ms: 100,
52        }
53    }
54}
55
56/// The Dynamic Context Core service.
57///
58/// Manages the cold-path analysis loop, GORNA arbitration, and agent coordination.
59pub struct DccService {
60    config: DccConfig,
61    context: Arc<std::sync::RwLock<Context>>,
62    registry: Arc<std::sync::Mutex<AgentRegistry>>,
63    running: Arc<AtomicBool>,
64    handle: Option<thread::JoinHandle<()>>,
65    event_tx: Sender<TelemetryEvent>,
66}
67
68impl DccService {
69    /// Creates a new DCC service.
70    pub fn new(config: DccConfig) -> (Self, Receiver<TelemetryEvent>) {
71        let (tx, rx) = crossbeam_channel::bounded(config.telemetry_buffer_size);
72        let service = Self {
73            config,
74            context: Arc::new(std::sync::RwLock::new(Context::default())),
75            registry: Arc::new(std::sync::Mutex::new(AgentRegistry::new())),
76            running: Arc::new(AtomicBool::new(false)),
77            handle: None,
78            event_tx: tx,
79        };
80        (service, rx)
81    }
82
83    /// Registers an agent with a priority value.
84    ///
85    /// Higher priority values mean the agent is updated first in each frame.
86    pub fn register_agent(&self, agent: Arc<std::sync::Mutex<dyn Agent>>, priority: f32) {
87        let mut registry = self.registry.lock().unwrap();
88        registry.register(agent, priority);
89    }
90
91    /// Starts the DCC background thread.
92    pub fn start(&mut self, event_rx: Receiver<TelemetryEvent>) {
93        if self.running.load(Ordering::SeqCst) {
94            return;
95        }
96
97        self.running.store(true, Ordering::SeqCst);
98        let running = Arc::clone(&self.running);
99        let context = Arc::clone(&self.context);
100        let registry = Arc::clone(&self.registry);
101        let tick_duration = Duration::from_secs_f32(1.0 / self.config.tick_rate as f32);
102        let agent_lock_timeout = Duration::from_millis(self.config.agent_lock_timeout_ms);
103
104        let handle = thread::spawn(move || {
105            let mut store = MetricStore::new();
106            let heuristic_engine = HeuristicEngine;
107            let arbitrator = GornaArbitrator::new(agent_lock_timeout);
108            let mut initial_negotiation_done = false;
109
110            log::info!("DCC Service thread started.");
111
112            while running.load(Ordering::Relaxed) {
113                let start_time = Instant::now();
114
115                // 1. Ingest all pending events
116                while let Ok(event) = event_rx.try_recv() {
117                    match event {
118                        TelemetryEvent::MetricUpdate { id, value } => {
119                            if let Some(v) = value.as_f64() {
120                                store.push(id, v as f32);
121                            }
122                        }
123                        TelemetryEvent::ResourceReport(_) => {}
124                        TelemetryEvent::HardwareReport(report) => {
125                            let mut ctx = context.write().unwrap();
126                            ctx.hardware.thermal = report.thermal;
127                            ctx.hardware.battery = report.battery;
128                            ctx.hardware.cpu_load = report.cpu_load;
129                            ctx.hardware.gpu_load = report.gpu_load.unwrap_or(0.0);
130                            ctx.hardware.available_vram = report.gpu_timings.as_ref().map(|_| 0);
131                            ctx.refresh_budget_multiplier();
132
133                            if let Some(gpu_timings) = report.gpu_timings {
134                                if let Some(frame_time_us) = gpu_timings.frame_total_duration_us() {
135                                    store.push(
136                                        khora_core::telemetry::MetricId::new(
137                                            "renderer",
138                                            "frame_time",
139                                        ),
140                                        frame_time_us as f32 / 1000.0,
141                                    );
142                                }
143                            }
144
145                            log::debug!(
146                                "DCC Hardware: Thermal={:?}, CPU={:.2}, GPU={:?}",
147                                ctx.hardware.thermal,
148                                ctx.hardware.cpu_load,
149                                ctx.hardware.gpu_load
150                            );
151                        }
152                        TelemetryEvent::PhaseChange(phase_name) => {
153                            let mut ctx = context.write().unwrap();
154                            if let Some(new_phase) = ExecutionPhase::from_name(&phase_name) {
155                                if ctx.phase.can_transition_to(new_phase) {
156                                    log::debug!("DCC Phase: {:?} → {:?}", ctx.phase, new_phase);
157                                    ctx.phase = new_phase;
158                                } else {
159                                    log::warn!(
160                                        "DCC: Invalid transition {:?} → {:?}",
161                                        ctx.phase,
162                                        new_phase
163                                    );
164                                }
165                            } else {
166                                log::warn!("DCC: Unknown phase '{}'", phase_name);
167                            }
168                        }
169                        TelemetryEvent::GpuReport(report) => {
170                            if let Some(frame_time_us) = report.frame_total_duration_us() {
171                                store.push(
172                                    khora_core::telemetry::MetricId::new(
173                                        "renderer",
174                                        "gpu_frame_time",
175                                    ),
176                                    frame_time_us as f32 / 1000.0,
177                                );
178                            }
179                            store.push(
180                                khora_core::telemetry::MetricId::new("renderer", "draw_calls"),
181                                report.draw_calls as f32,
182                            );
183                            store.push(
184                                khora_core::telemetry::MetricId::new(
185                                    "renderer",
186                                    "triangles_rendered",
187                                ),
188                                report.triangles_rendered as f32,
189                            );
190                        }
191                    }
192                }
193
194                // 2. Perform Analysis & Arbitration
195                let (report, ctx_copy) = {
196                    let mut ctx = context.write().unwrap();
197                    ctx.refresh_budget_multiplier();
198                    let report = heuristic_engine.analyze(&ctx, &store);
199                    (report, ctx.clone())
200                };
201
202                for alert in &report.alerts {
203                    log::info!("DCC Analysis: {}", alert);
204                }
205
206                // 3. GORNA Negotiation
207                if report.needs_negotiation || !initial_negotiation_done {
208                    let registry_lock = registry.lock().unwrap();
209                    if !registry_lock.is_empty() {
210                        let agents: Vec<_> = registry_lock.iter().cloned().collect();
211                        drop(registry_lock);
212
213                        let mut agents_slice: Vec<Arc<std::sync::Mutex<dyn Agent>>> = agents;
214                        arbitrator.arbitrate(&ctx_copy, &report, &mut agents_slice);
215                        initial_negotiation_done = true;
216                    }
217                }
218
219                // 4. Sleep until next tick
220                let elapsed = start_time.elapsed();
221                if elapsed < tick_duration {
222                    thread::sleep(tick_duration - elapsed);
223                }
224            }
225            log::info!("DCC Service thread stopped.");
226        });
227
228        self.handle = Some(handle);
229    }
230
231    /// Stops the DCC background thread.
232    pub fn stop(&mut self) {
233        self.running.store(false, Ordering::SeqCst);
234        if let Some(handle) = self.handle.take() {
235            let _ = handle.join();
236        }
237    }
238
239    /// Returns a sender handle to submit events to the DCC.
240    pub fn event_sender(&self) -> Sender<TelemetryEvent> {
241        self.event_tx.clone()
242    }
243
244    /// Returns the current context.
245    pub fn get_context(&self) -> Context {
246        self.context.read().unwrap().clone()
247    }
248
249    /// Updates all registered agents in priority order.
250    ///
251    /// This is called each frame by the engine loop.
252    pub fn update_agents(&self, context: &mut khora_core::EngineContext<'_>) {
253        if let Ok(registry) = self.registry.lock() {
254            registry.update_all(context);
255        }
256    }
257
258    /// Executes all registered agents in priority order.
259    ///
260    /// Called each frame after [`update_agents`](Self::update_agents).
261    /// Each agent performs its primary work (e.g., the `RenderAgent` renders).
262    pub fn execute_agents(&self) {
263        if let Ok(registry) = self.registry.lock() {
264            registry.execute_all();
265        }
266    }
267
268    /// Returns the number of registered agents.
269    pub fn agent_count(&self) -> usize {
270        self.registry.lock().map(|r| r.len()).unwrap_or(0)
271    }
272
273    /// Returns a reference to the agent with the given ID, if registered.
274    pub fn get_agent(&self, id: AgentId) -> Option<Arc<Mutex<dyn Agent>>> {
275        self.registry.lock().ok()?.get_by_id(id)
276    }
277}
278
279impl Drop for DccService {
280    fn drop(&mut self) {
281        self.stop();
282    }
283}
284
285#[cfg(test)]
286mod tests {
287    use super::*;
288    use khora_core::control::gorna::{
289        AgentId, AgentStatus, NegotiationRequest, NegotiationResponse, ResourceBudget, StrategyId,
290        StrategyOption,
291    };
292    use khora_core::telemetry::{MetricId, MetricValue};
293
294    struct StubAgent {
295        budget_applied: bool,
296    }
297
298    impl Agent for StubAgent {
299        fn id(&self) -> AgentId {
300            AgentId::Renderer
301        }
302        fn negotiate(&mut self, _: NegotiationRequest) -> NegotiationResponse {
303            NegotiationResponse {
304                strategies: vec![StrategyOption {
305                    id: StrategyId::Balanced,
306                    estimated_time: Duration::from_millis(8),
307                    estimated_vram: 1024,
308                }],
309            }
310        }
311        fn apply_budget(&mut self, _: ResourceBudget) {
312            self.budget_applied = true;
313        }
314        fn update(&mut self, _: &mut khora_core::EngineContext<'_>) {}
315        fn report_status(&self) -> AgentStatus {
316            AgentStatus {
317                agent_id: AgentId::Renderer,
318                current_strategy: StrategyId::Balanced,
319                health_score: 1.0,
320                is_stalled: false,
321                message: String::new(),
322            }
323        }
324        fn execute(&mut self) {}
325        fn as_any(&self) -> &dyn std::any::Any {
326            self
327        }
328        fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
329            self
330        }
331    }
332
333    #[test]
334    fn test_dcc_service_lifecycle() {
335        let (mut dcc, rx) = DccService::new(DccConfig::default());
336        dcc.start(rx);
337        assert!(dcc.running.load(Ordering::SeqCst));
338        dcc.stop();
339        assert!(!dcc.running.load(Ordering::SeqCst));
340    }
341
342    #[test]
343    fn test_dcc_phase_change_ingestion() {
344        let (mut dcc, rx) = DccService::new(DccConfig::default());
345        let tx = dcc.event_sender();
346        dcc.start(rx);
347
348        tx.send(TelemetryEvent::PhaseChange("Simulation".to_string()))
349            .unwrap();
350
351        thread::sleep(Duration::from_millis(100));
352
353        let ctx = dcc.get_context();
354        assert_eq!(ctx.phase, ExecutionPhase::Simulation);
355
356        dcc.stop();
357    }
358
359    #[test]
360    fn test_dcc_metric_ingestion_smoke() {
361        let (mut dcc, rx) = DccService::new(DccConfig::default());
362        let tx = dcc.event_sender();
363        dcc.start(rx);
364
365        let id = MetricId::new("test", "metric");
366        tx.send(TelemetryEvent::MetricUpdate {
367            id,
368            value: MetricValue::Gauge(42.0),
369        })
370        .unwrap();
371
372        thread::sleep(Duration::from_millis(50));
373        dcc.stop();
374    }
375
376    #[test]
377    fn test_dcc_initial_negotiation_fires_with_agent() {
378        let (mut dcc, rx) = DccService::new(DccConfig {
379            tick_rate: 100,
380            ..Default::default()
381        });
382        let agent = Arc::new(std::sync::Mutex::new(StubAgent {
383            budget_applied: false,
384        }));
385        dcc.register_agent(agent.clone(), 1.0);
386        dcc.start(rx);
387
388        thread::sleep(Duration::from_millis(200));
389
390        let applied = agent.lock().unwrap().budget_applied;
391        dcc.stop();
392
393        assert!(
394            applied,
395            "Initial GORNA negotiation should have called apply_budget"
396        );
397    }
398}