1use crate::context::{Context, ExecutionPhase};
18use crate::metrics::MetricStore;
19use crossbeam_channel::{Receiver, Sender};
20use khora_core::agent::Agent;
21use khora_core::telemetry::TelemetryEvent;
22use std::sync::atomic::{AtomicBool, Ordering};
23use std::sync::Arc;
24use std::thread;
25use std::time::{Duration, Instant};
26
27use crate::analysis::HeuristicEngine;
28use crate::gorna::GornaArbitrator;
29use crate::registry::AgentRegistry;
30use khora_core::control::gorna::AgentId;
31use std::sync::Mutex;
32
33#[derive(Debug, Clone)]
35pub struct DccConfig {
36 pub tick_rate: u32,
38 pub telemetry_buffer_size: usize,
41 pub agent_lock_timeout_ms: u64,
44}
45
46impl Default for DccConfig {
47 fn default() -> Self {
48 Self {
49 tick_rate: 20,
50 telemetry_buffer_size: 1000,
51 agent_lock_timeout_ms: 100,
52 }
53 }
54}
55
56pub struct DccService {
60 config: DccConfig,
61 context: Arc<std::sync::RwLock<Context>>,
62 registry: Arc<std::sync::Mutex<AgentRegistry>>,
63 running: Arc<AtomicBool>,
64 handle: Option<thread::JoinHandle<()>>,
65 event_tx: Sender<TelemetryEvent>,
66}
67
68impl DccService {
69 pub fn new(config: DccConfig) -> (Self, Receiver<TelemetryEvent>) {
71 let (tx, rx) = crossbeam_channel::bounded(config.telemetry_buffer_size);
72 let service = Self {
73 config,
74 context: Arc::new(std::sync::RwLock::new(Context::default())),
75 registry: Arc::new(std::sync::Mutex::new(AgentRegistry::new())),
76 running: Arc::new(AtomicBool::new(false)),
77 handle: None,
78 event_tx: tx,
79 };
80 (service, rx)
81 }
82
83 pub fn register_agent(&self, agent: Arc<std::sync::Mutex<dyn Agent>>, priority: f32) {
87 let mut registry = self.registry.lock().unwrap();
88 registry.register(agent, priority);
89 }
90
91 pub fn start(&mut self, event_rx: Receiver<TelemetryEvent>) {
93 if self.running.load(Ordering::SeqCst) {
94 return;
95 }
96
97 self.running.store(true, Ordering::SeqCst);
98 let running = Arc::clone(&self.running);
99 let context = Arc::clone(&self.context);
100 let registry = Arc::clone(&self.registry);
101 let tick_duration = Duration::from_secs_f32(1.0 / self.config.tick_rate as f32);
102 let agent_lock_timeout = Duration::from_millis(self.config.agent_lock_timeout_ms);
103
104 let handle = thread::spawn(move || {
105 let mut store = MetricStore::new();
106 let heuristic_engine = HeuristicEngine;
107 let arbitrator = GornaArbitrator::new(agent_lock_timeout);
108 let mut initial_negotiation_done = false;
109
110 log::info!("DCC Service thread started.");
111
112 while running.load(Ordering::Relaxed) {
113 let start_time = Instant::now();
114
115 while let Ok(event) = event_rx.try_recv() {
117 match event {
118 TelemetryEvent::MetricUpdate { id, value } => {
119 if let Some(v) = value.as_f64() {
120 store.push(id, v as f32);
121 }
122 }
123 TelemetryEvent::ResourceReport(_) => {}
124 TelemetryEvent::HardwareReport(report) => {
125 let mut ctx = context.write().unwrap();
126 ctx.hardware.thermal = report.thermal;
127 ctx.hardware.battery = report.battery;
128 ctx.hardware.cpu_load = report.cpu_load;
129 ctx.hardware.gpu_load = report.gpu_load.unwrap_or(0.0);
130 ctx.hardware.available_vram = report.gpu_timings.as_ref().map(|_| 0);
131 ctx.refresh_budget_multiplier();
132
133 if let Some(gpu_timings) = report.gpu_timings {
134 if let Some(frame_time_us) = gpu_timings.frame_total_duration_us() {
135 store.push(
136 khora_core::telemetry::MetricId::new(
137 "renderer",
138 "frame_time",
139 ),
140 frame_time_us as f32 / 1000.0,
141 );
142 }
143 }
144
145 log::debug!(
146 "DCC Hardware: Thermal={:?}, CPU={:.2}, GPU={:?}",
147 ctx.hardware.thermal,
148 ctx.hardware.cpu_load,
149 ctx.hardware.gpu_load
150 );
151 }
152 TelemetryEvent::PhaseChange(phase_name) => {
153 let mut ctx = context.write().unwrap();
154 if let Some(new_phase) = ExecutionPhase::from_name(&phase_name) {
155 if ctx.phase.can_transition_to(new_phase) {
156 log::debug!("DCC Phase: {:?} → {:?}", ctx.phase, new_phase);
157 ctx.phase = new_phase;
158 } else {
159 log::warn!(
160 "DCC: Invalid transition {:?} → {:?}",
161 ctx.phase,
162 new_phase
163 );
164 }
165 } else {
166 log::warn!("DCC: Unknown phase '{}'", phase_name);
167 }
168 }
169 TelemetryEvent::GpuReport(report) => {
170 if let Some(frame_time_us) = report.frame_total_duration_us() {
171 store.push(
172 khora_core::telemetry::MetricId::new(
173 "renderer",
174 "gpu_frame_time",
175 ),
176 frame_time_us as f32 / 1000.0,
177 );
178 }
179 store.push(
180 khora_core::telemetry::MetricId::new("renderer", "draw_calls"),
181 report.draw_calls as f32,
182 );
183 store.push(
184 khora_core::telemetry::MetricId::new(
185 "renderer",
186 "triangles_rendered",
187 ),
188 report.triangles_rendered as f32,
189 );
190 }
191 }
192 }
193
194 let (report, ctx_copy) = {
196 let mut ctx = context.write().unwrap();
197 ctx.refresh_budget_multiplier();
198 let report = heuristic_engine.analyze(&ctx, &store);
199 (report, ctx.clone())
200 };
201
202 for alert in &report.alerts {
203 log::info!("DCC Analysis: {}", alert);
204 }
205
206 if report.needs_negotiation || !initial_negotiation_done {
208 let registry_lock = registry.lock().unwrap();
209 if !registry_lock.is_empty() {
210 let agents: Vec<_> = registry_lock.iter().cloned().collect();
211 drop(registry_lock);
212
213 let mut agents_slice: Vec<Arc<std::sync::Mutex<dyn Agent>>> = agents;
214 arbitrator.arbitrate(&ctx_copy, &report, &mut agents_slice);
215 initial_negotiation_done = true;
216 }
217 }
218
219 let elapsed = start_time.elapsed();
221 if elapsed < tick_duration {
222 thread::sleep(tick_duration - elapsed);
223 }
224 }
225 log::info!("DCC Service thread stopped.");
226 });
227
228 self.handle = Some(handle);
229 }
230
231 pub fn stop(&mut self) {
233 self.running.store(false, Ordering::SeqCst);
234 if let Some(handle) = self.handle.take() {
235 let _ = handle.join();
236 }
237 }
238
239 pub fn event_sender(&self) -> Sender<TelemetryEvent> {
241 self.event_tx.clone()
242 }
243
244 pub fn get_context(&self) -> Context {
246 self.context.read().unwrap().clone()
247 }
248
249 pub fn update_agents(&self, context: &mut khora_core::EngineContext<'_>) {
253 if let Ok(registry) = self.registry.lock() {
254 registry.update_all(context);
255 }
256 }
257
258 pub fn execute_agents(&self) {
263 if let Ok(registry) = self.registry.lock() {
264 registry.execute_all();
265 }
266 }
267
268 pub fn agent_count(&self) -> usize {
270 self.registry.lock().map(|r| r.len()).unwrap_or(0)
271 }
272
273 pub fn get_agent(&self, id: AgentId) -> Option<Arc<Mutex<dyn Agent>>> {
275 self.registry.lock().ok()?.get_by_id(id)
276 }
277}
278
279impl Drop for DccService {
280 fn drop(&mut self) {
281 self.stop();
282 }
283}
284
285#[cfg(test)]
286mod tests {
287 use super::*;
288 use khora_core::control::gorna::{
289 AgentId, AgentStatus, NegotiationRequest, NegotiationResponse, ResourceBudget, StrategyId,
290 StrategyOption,
291 };
292 use khora_core::telemetry::{MetricId, MetricValue};
293
294 struct StubAgent {
295 budget_applied: bool,
296 }
297
298 impl Agent for StubAgent {
299 fn id(&self) -> AgentId {
300 AgentId::Renderer
301 }
302 fn negotiate(&mut self, _: NegotiationRequest) -> NegotiationResponse {
303 NegotiationResponse {
304 strategies: vec![StrategyOption {
305 id: StrategyId::Balanced,
306 estimated_time: Duration::from_millis(8),
307 estimated_vram: 1024,
308 }],
309 }
310 }
311 fn apply_budget(&mut self, _: ResourceBudget) {
312 self.budget_applied = true;
313 }
314 fn update(&mut self, _: &mut khora_core::EngineContext<'_>) {}
315 fn report_status(&self) -> AgentStatus {
316 AgentStatus {
317 agent_id: AgentId::Renderer,
318 current_strategy: StrategyId::Balanced,
319 health_score: 1.0,
320 is_stalled: false,
321 message: String::new(),
322 }
323 }
324 fn execute(&mut self) {}
325 fn as_any(&self) -> &dyn std::any::Any {
326 self
327 }
328 fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
329 self
330 }
331 }
332
333 #[test]
334 fn test_dcc_service_lifecycle() {
335 let (mut dcc, rx) = DccService::new(DccConfig::default());
336 dcc.start(rx);
337 assert!(dcc.running.load(Ordering::SeqCst));
338 dcc.stop();
339 assert!(!dcc.running.load(Ordering::SeqCst));
340 }
341
342 #[test]
343 fn test_dcc_phase_change_ingestion() {
344 let (mut dcc, rx) = DccService::new(DccConfig::default());
345 let tx = dcc.event_sender();
346 dcc.start(rx);
347
348 tx.send(TelemetryEvent::PhaseChange("Simulation".to_string()))
349 .unwrap();
350
351 thread::sleep(Duration::from_millis(100));
352
353 let ctx = dcc.get_context();
354 assert_eq!(ctx.phase, ExecutionPhase::Simulation);
355
356 dcc.stop();
357 }
358
359 #[test]
360 fn test_dcc_metric_ingestion_smoke() {
361 let (mut dcc, rx) = DccService::new(DccConfig::default());
362 let tx = dcc.event_sender();
363 dcc.start(rx);
364
365 let id = MetricId::new("test", "metric");
366 tx.send(TelemetryEvent::MetricUpdate {
367 id,
368 value: MetricValue::Gauge(42.0),
369 })
370 .unwrap();
371
372 thread::sleep(Duration::from_millis(50));
373 dcc.stop();
374 }
375
376 #[test]
377 fn test_dcc_initial_negotiation_fires_with_agent() {
378 let (mut dcc, rx) = DccService::new(DccConfig {
379 tick_rate: 100,
380 ..Default::default()
381 });
382 let agent = Arc::new(std::sync::Mutex::new(StubAgent {
383 budget_applied: false,
384 }));
385 dcc.register_agent(agent.clone(), 1.0);
386 dcc.start(rx);
387
388 thread::sleep(Duration::from_millis(200));
389
390 let applied = agent.lock().unwrap().budget_applied;
391 dcc.stop();
392
393 assert!(
394 applied,
395 "Initial GORNA negotiation should have called apply_budget"
396 );
397 }
398}