1use crate::analysis::AnalysisReport;
28use crate::context::{Context, ExecutionPhase};
29use khora_core::agent::Agent;
30use khora_core::control::gorna::{
31 AgentId, NegotiationRequest, ResourceBudget, ResourceConstraints, StrategyId, StrategyOption,
32};
33use std::sync::{Arc, Mutex};
34use std::time::{Duration, Instant};
35
36const MAX_STALLED_AGENTS: usize = 2;
37
38fn try_lock_agent_with_timeout<T: ?Sized>(
39 mutex: &Mutex<T>,
40 timeout: Duration,
41) -> Option<std::sync::MutexGuard<'_, T>> {
42 let start = Instant::now();
43 loop {
44 match mutex.try_lock() {
45 Ok(guard) => return Some(guard),
46 Err(std::sync::TryLockError::WouldBlock) => {
47 if start.elapsed() >= timeout {
48 return None;
49 }
50 std::thread::yield_now();
51 }
52 Err(std::sync::TryLockError::Poisoned(err)) => {
53 log::error!("Agent mutex poisoned: {}", err);
54 return None;
55 }
56 }
57 }
58}
59
60pub struct GornaArbitrator {
67 lock_timeout: Duration,
68}
69
70struct AgentNegotiation {
72 agent_index: usize,
73 agent_id: AgentId,
74 priority: f32,
75 strategies: Vec<StrategyOption>,
76}
77
78struct AgentAllocation {
80 agent_index: usize,
81 strategy: StrategyOption,
82}
83
84impl GornaArbitrator {
85 pub fn new(lock_timeout: Duration) -> Self {
91 Self { lock_timeout }
92 }
93 pub fn arbitrate(
100 &self,
101 context: &Context,
102 report: &AnalysisReport,
103 agents: &mut [Arc<Mutex<dyn Agent>>],
104 ) {
105 if agents.is_empty() {
106 return;
107 }
108
109 log::debug!(
110 "GORNA: Starting arbitration for {} agents. Phase={:?}, Multiplier={:.2}",
111 agents.len(),
112 context.phase,
113 context.global_budget_multiplier
114 );
115
116 let stalled_count = self.check_agent_health(agents);
118 if stalled_count >= MAX_STALLED_AGENTS || report.death_spiral_detected {
119 log::error!(
120 "GORNA: Death spiral detected ({} stalled agents). \
121 Forcing emergency LowPower on all agents.",
122 stalled_count
123 );
124 self.emergency_stop(agents);
125 return;
126 }
127
128 let base_latency_ms = report.suggested_latency_ms;
131 let effective_budget_ms = base_latency_ms * context.global_budget_multiplier;
133
134 log::debug!(
135 "GORNA: Effective frame budget: {:.2}ms (base={:.2}ms × multiplier={:.2})",
136 effective_budget_ms,
137 base_latency_ms,
138 context.global_budget_multiplier
139 );
140
141 let mut negotiations: Vec<AgentNegotiation> = Vec::with_capacity(agents.len());
143
144 for (i, agent_mutex) in agents.iter().enumerate() {
145 let Some(mut agent) = try_lock_agent_with_timeout(agent_mutex, self.lock_timeout)
146 else {
147 log::warn!(
148 "GORNA: Failed to lock agent {} for negotiation (timeout). Skipping.",
149 i
150 );
151 continue;
152 };
153 let agent_id = agent.id();
154 let priority = self.get_agent_priority(agent_id, context.phase);
155
156 let request = NegotiationRequest {
157 target_latency: Duration::from_secs_f64(effective_budget_ms as f64 / 1000.0),
158 priority_weight: priority,
159 constraints: ResourceConstraints {
160 must_run: self.is_critical_agent(agent_id, context.phase),
161 ..Default::default()
162 },
163 };
164
165 let response = agent.negotiate(request);
166
167 if response.strategies.is_empty() {
168 log::warn!(
169 "GORNA: Agent {:?} returned no strategies. Skipping.",
170 agent_id
171 );
172 continue;
173 }
174
175 let mut strategies = response.strategies;
177 strategies.sort_by(|a, b| a.estimated_time.cmp(&b.estimated_time));
178
179 negotiations.push(AgentNegotiation {
180 agent_index: i,
181 agent_id,
182 priority,
183 strategies,
184 });
185 }
186
187 let max_vram = context
189 .hardware
190 .available_vram
191 .or(context.hardware.total_vram);
192 let allocations = self.fit_budgets(&negotiations, effective_budget_ms, max_vram);
193
194 for alloc in &allocations {
196 let Some(mut agent) =
197 try_lock_agent_with_timeout(&agents[alloc.agent_index], self.lock_timeout)
198 else {
199 log::warn!(
200 "GORNA: Failed to lock agent for budget issuance (index {}). Skipping.",
201 alloc.agent_index
202 );
203 continue;
204 };
205
206 let budget = ResourceBudget {
207 strategy_id: alloc.strategy.id,
208 time_limit: alloc.strategy.estimated_time,
209 memory_limit: Some(alloc.strategy.estimated_vram),
210 extra_params: std::collections::HashMap::new(),
211 };
212
213 log::info!(
214 "GORNA: Issuing budget to {:?} — strategy={:?}, time={:.2}ms, vram={}KB",
215 agent.id(),
216 budget.strategy_id,
217 budget.time_limit.as_secs_f64() * 1000.0,
218 alloc.strategy.estimated_vram / 1024
219 );
220
221 agent.apply_budget(budget);
222 }
223
224 log::debug!(
225 "GORNA: Arbitration complete. {} budgets issued.",
226 allocations.len()
227 );
228 }
229
230 fn check_agent_health(&self, agents: &[Arc<Mutex<dyn Agent>>]) -> usize {
232 let mut stalled = 0;
233 for (i, agent_mutex) in agents.iter().enumerate() {
234 let Some(agent) = try_lock_agent_with_timeout(agent_mutex, self.lock_timeout) else {
235 log::warn!(
236 "GORNA: Failed to lock agent {} for health check (timeout).",
237 i
238 );
239 continue;
240 };
241 let status = agent.report_status();
242 if status.is_stalled {
243 log::warn!(
244 "GORNA: Agent {:?} is STALLED. Health={:.2}, Message: {}",
245 status.agent_id,
246 status.health_score,
247 status.message
248 );
249 stalled += 1;
250 } else if status.health_score < 0.5 {
251 log::warn!(
252 "GORNA: Agent {:?} health degraded ({:.2}). Message: {}",
253 status.agent_id,
254 status.health_score,
255 status.message
256 );
257 }
258 }
259 stalled
260 }
261
262 fn emergency_stop(&self, agents: &mut [Arc<Mutex<dyn Agent>>]) {
264 for (i, agent_mutex) in agents.iter_mut().enumerate() {
265 let Some(mut agent) = try_lock_agent_with_timeout(agent_mutex, self.lock_timeout)
266 else {
267 log::warn!(
268 "GORNA: Failed to lock agent {} for emergency stop (timeout).",
269 i
270 );
271 continue;
272 };
273
274 let budget = ResourceBudget {
275 strategy_id: StrategyId::LowPower,
276 time_limit: Duration::from_millis(2),
277 memory_limit: None,
278 extra_params: std::collections::HashMap::new(),
279 };
280
281 log::warn!("GORNA: Emergency LowPower issued to {:?}.", agent.id());
282 agent.apply_budget(budget);
283 }
284 }
285
286 fn fit_budgets(
294 &self,
295 negotiations: &[AgentNegotiation],
296 total_budget_ms: f32,
297 max_vram_bytes: Option<u64>,
298 ) -> Vec<AgentAllocation> {
299 if negotiations.is_empty() {
300 return Vec::new();
301 }
302
303 let mut sorted_indices: Vec<usize> = (0..negotiations.len()).collect();
304 sorted_indices.sort_by(|&a, &b| {
305 negotiations[b]
306 .priority
307 .partial_cmp(&negotiations[a].priority)
308 .unwrap_or(std::cmp::Ordering::Equal)
309 });
310
311 let mut allocations: Vec<AgentAllocation> = negotiations
312 .iter()
313 .map(|n| AgentAllocation {
314 agent_index: n.agent_index,
315 strategy: n.strategies[0].clone(),
316 })
317 .collect();
318
319 let total_min_ms: f32 = allocations
320 .iter()
321 .map(|a| a.strategy.estimated_time.as_secs_f32() * 1000.0)
322 .sum();
323
324 let total_min_vram: u64 = allocations.iter().map(|a| a.strategy.estimated_vram).sum();
325
326 if total_min_ms > total_budget_ms {
327 log::warn!(
328 "GORNA: Even minimum strategies ({:.2}ms) exceed budget ({:.2}ms). \
329 All agents at LowPower.",
330 total_min_ms,
331 total_budget_ms
332 );
333 return allocations;
334 }
335
336 if let Some(max_vram) = max_vram_bytes {
337 if total_min_vram > max_vram {
338 log::warn!(
339 "GORNA: Even minimum strategies VRAM ({:.2}MB) exceeds budget ({:.2}MB).",
340 total_min_vram as f64 / (1024.0 * 1024.0),
341 max_vram as f64 / (1024.0 * 1024.0)
342 );
343 }
344 }
345
346 let mut remaining_ms = total_budget_ms - total_min_ms;
347 let mut current_vram = total_min_vram;
348
349 for &idx in &sorted_indices {
350 let negotiation = &negotiations[idx];
351 let current_cost_ms = allocations[idx].strategy.estimated_time.as_secs_f32() * 1000.0;
352 let current_vram_cost = allocations[idx].strategy.estimated_vram;
353
354 let mut best_upgrade: Option<&StrategyOption> = None;
355 for strategy in negotiation.strategies.iter().rev() {
356 let cost_ms = strategy.estimated_time.as_secs_f32() * 1000.0;
357 let delta_ms = cost_ms - current_cost_ms;
358 let delta_vram = strategy.estimated_vram.saturating_sub(current_vram_cost);
359
360 let time_fits = delta_ms <= remaining_ms;
361 let vram_fits = max_vram_bytes
362 .map(|max| current_vram + delta_vram <= max)
363 .unwrap_or(true);
364
365 if time_fits && vram_fits {
366 best_upgrade = Some(strategy);
367 break;
368 }
369 }
370
371 if let Some(upgrade) = best_upgrade {
372 let old_cost = current_cost_ms;
373 let new_cost = upgrade.estimated_time.as_secs_f32() * 1000.0;
374 let delta_vram = upgrade.estimated_vram.saturating_sub(current_vram_cost);
375
376 remaining_ms -= new_cost - old_cost;
377 current_vram += delta_vram;
378 allocations[idx].strategy = upgrade.clone();
379
380 log::trace!(
381 "GORNA: Upgraded {:?} from {:.2}ms to {:.2}ms (remaining={:.2}ms, vram={:.2}MB)",
382 negotiation.agent_id,
383 old_cost,
384 new_cost,
385 remaining_ms,
386 current_vram as f64 / (1024.0 * 1024.0)
387 );
388 }
389 }
390
391 if let Some(max_vram) = max_vram_bytes {
392 let total_vram: u64 = allocations.iter().map(|a| a.strategy.estimated_vram).sum();
393 log::debug!(
394 "GORNA: Total VRAM allocated: {:.2}MB / {:.2}MB",
395 total_vram as f64 / (1024.0 * 1024.0),
396 max_vram as f64 / (1024.0 * 1024.0)
397 );
398 }
399
400 allocations
401 }
402
403 fn get_agent_priority(&self, id: AgentId, phase: ExecutionPhase) -> f32 {
408 match phase {
409 ExecutionPhase::Boot => match id {
410 AgentId::Asset => 1.0,
411 _ => 0.3,
412 },
413 ExecutionPhase::Menu => match id {
414 AgentId::Renderer => 0.6,
415 AgentId::Asset => 1.0,
416 AgentId::Audio => 0.8,
417 _ => 0.3,
418 },
419 ExecutionPhase::Simulation => match id {
420 AgentId::Renderer => 1.0,
421 AgentId::Physics => 1.0,
422 AgentId::Ecs => 0.8,
423 AgentId::Audio => 0.6,
424 AgentId::Asset => 0.5,
425 },
426 ExecutionPhase::Background => 0.1, }
428 }
429
430 fn is_critical_agent(&self, id: AgentId, phase: ExecutionPhase) -> bool {
433 match phase {
434 ExecutionPhase::Boot => matches!(id, AgentId::Asset),
435 ExecutionPhase::Menu => matches!(id, AgentId::Renderer),
436 ExecutionPhase::Simulation => {
437 matches!(id, AgentId::Renderer | AgentId::Physics | AgentId::Ecs)
438 }
439 ExecutionPhase::Background => false,
440 }
441 }
442}
443
444#[cfg(test)]
445mod tests {
446 use super::*;
447 use crate::analysis::AnalysisReport;
448 use crate::context::Context;
449 use khora_core::agent::Agent;
450 use khora_core::control::gorna::{
451 AgentId, AgentStatus, NegotiationRequest, NegotiationResponse, ResourceBudget, StrategyId,
452 StrategyOption,
453 };
454 use khora_core::EngineContext;
455
456 struct MockAgent {
459 id: AgentId,
460 applied_budget: Option<ResourceBudget>,
461 is_stalled: bool,
462 health: f32,
463 }
464
465 impl MockAgent {
466 fn new(id: AgentId) -> Self {
467 Self {
468 id,
469 applied_budget: None,
470 is_stalled: false,
471 health: 1.0,
472 }
473 }
474
475 fn stalled(id: AgentId) -> Self {
476 Self {
477 id,
478 applied_budget: None,
479 is_stalled: true,
480 health: 0.0,
481 }
482 }
483 }
484
485 impl Agent for MockAgent {
486 fn id(&self) -> AgentId {
487 self.id
488 }
489
490 fn negotiate(&mut self, _request: NegotiationRequest) -> NegotiationResponse {
491 NegotiationResponse {
492 strategies: vec![
493 StrategyOption {
494 id: StrategyId::LowPower,
495 estimated_time: Duration::from_millis(2),
496 estimated_vram: 1024,
497 },
498 StrategyOption {
499 id: StrategyId::Balanced,
500 estimated_time: Duration::from_millis(8),
501 estimated_vram: 10 * 1024 * 1024,
502 },
503 StrategyOption {
504 id: StrategyId::HighPerformance,
505 estimated_time: Duration::from_millis(14),
506 estimated_vram: 20 * 1024 * 1024,
507 },
508 ],
509 }
510 }
511
512 fn apply_budget(&mut self, budget: ResourceBudget) {
513 self.applied_budget = Some(budget);
514 }
515
516 fn update(&mut self, _context: &mut EngineContext<'_>) {}
517
518 fn report_status(&self) -> AgentStatus {
519 AgentStatus {
520 agent_id: self.id,
521 current_strategy: self
522 .applied_budget
523 .as_ref()
524 .map(|b| b.strategy_id)
525 .unwrap_or(StrategyId::Balanced),
526 health_score: self.health,
527 is_stalled: self.is_stalled,
528 message: String::new(),
529 }
530 }
531
532 fn execute(&mut self) {}
533
534 fn as_any(&self) -> &dyn std::any::Any {
535 self
536 }
537
538 fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
539 self
540 }
541 }
542
543 fn normal_report() -> AnalysisReport {
544 AnalysisReport {
545 needs_negotiation: true,
546 suggested_latency_ms: 16.66,
547 death_spiral_detected: false,
548 alerts: Vec::new(),
549 }
550 }
551
552 fn simulation_ctx() -> Context {
553 Context {
554 phase: ExecutionPhase::Simulation,
555 global_budget_multiplier: 1.0,
556 ..Default::default()
557 }
558 }
559
560 fn create_arbitrator() -> GornaArbitrator {
563 GornaArbitrator::new(Duration::from_millis(100))
564 }
565
566 #[test]
567 fn test_arbitrate_single_agent_gets_best_strategy() {
568 let arbitrator = create_arbitrator();
569 let ctx = simulation_ctx();
570 let report = normal_report();
571 let agent = MockAgent::new(AgentId::Renderer);
572 let mut agents: Vec<Arc<Mutex<dyn Agent>>> = vec![Arc::new(Mutex::new(agent))];
573
574 arbitrator.arbitrate(&ctx, &report, &mut agents);
575
576 let lock = agents[0].lock().unwrap();
577 let mock = unsafe { &*((&*lock as *const dyn Agent) as *const MockAgent) };
578 let budget = mock
579 .applied_budget
580 .as_ref()
581 .expect("Budget should be applied");
582 assert_eq!(budget.strategy_id, StrategyId::HighPerformance);
584 }
585
586 #[test]
587 fn test_arbitrate_respects_global_budget() {
588 let arbitrator = create_arbitrator();
589 let ctx = simulation_ctx();
590 let report = normal_report();
591
592 let renderer = MockAgent::new(AgentId::Renderer);
598 let physics = MockAgent::new(AgentId::Physics);
599 let mut agents: Vec<Arc<Mutex<dyn Agent>>> = vec![
600 Arc::new(Mutex::new(renderer)),
601 Arc::new(Mutex::new(physics)),
602 ];
603
604 arbitrator.arbitrate(&ctx, &report, &mut agents);
605
606 for agent_mutex in &agents {
608 let lock = agent_mutex.lock().unwrap();
609 let mock = unsafe { &*((&*lock as *const dyn Agent) as *const MockAgent) };
610 assert!(mock.applied_budget.is_some());
611 }
612
613 let total_cost_ms: f64 = agents
615 .iter()
616 .map(|a| {
617 let lock = a.lock().unwrap();
618 let mock = unsafe { &*((&*lock as *const dyn Agent) as *const MockAgent) };
619 mock.applied_budget
620 .as_ref()
621 .unwrap()
622 .time_limit
623 .as_secs_f64()
624 * 1000.0
625 })
626 .sum();
627 assert!(
628 total_cost_ms <= 16.66 + 0.1,
629 "Total cost {:.2}ms exceeds budget 16.66ms",
630 total_cost_ms
631 );
632 }
633
634 #[test]
635 fn test_arbitrate_thermal_reduces_budget() {
636 let arbitrator = create_arbitrator();
637 let mut ctx = simulation_ctx();
638 ctx.hardware.thermal = khora_core::platform::ThermalStatus::Throttling;
639 ctx.refresh_budget_multiplier(); let mut report = normal_report();
642 report.suggested_latency_ms = 33.33; let agent = MockAgent::new(AgentId::Renderer);
645 let mut agents: Vec<Arc<Mutex<dyn Agent>>> = vec![Arc::new(Mutex::new(agent))];
646
647 arbitrator.arbitrate(&ctx, &report, &mut agents);
648
649 let lock = agents[0].lock().unwrap();
650 let mock = unsafe { &*((&*lock as *const dyn Agent) as *const MockAgent) };
651 let budget = mock
652 .applied_budget
653 .as_ref()
654 .expect("Budget should be applied");
655 assert_eq!(budget.strategy_id, StrategyId::HighPerformance);
657 }
658
659 #[test]
660 fn test_emergency_stop_on_death_spiral() {
661 let arbitrator = create_arbitrator();
662 let ctx = simulation_ctx();
663 let mut report = normal_report();
664 report.death_spiral_detected = true;
665
666 let renderer = MockAgent::new(AgentId::Renderer);
667 let physics = MockAgent::new(AgentId::Physics);
668 let mut agents: Vec<Arc<Mutex<dyn Agent>>> = vec![
669 Arc::new(Mutex::new(renderer)),
670 Arc::new(Mutex::new(physics)),
671 ];
672
673 arbitrator.arbitrate(&ctx, &report, &mut agents);
674
675 for agent_mutex in &agents {
677 let lock = agent_mutex.lock().unwrap();
678 let mock = unsafe { &*((&*lock as *const dyn Agent) as *const MockAgent) };
679 let budget = mock
680 .applied_budget
681 .as_ref()
682 .expect("Budget should be applied");
683 assert_eq!(budget.strategy_id, StrategyId::LowPower);
684 }
685 }
686
687 #[test]
688 fn test_emergency_stop_on_stalled_agents() {
689 let arbitrator = create_arbitrator();
690 let ctx = simulation_ctx();
691 let report = normal_report();
692
693 let stalled1 = MockAgent::stalled(AgentId::Renderer);
695 let stalled2 = MockAgent::stalled(AgentId::Physics);
696 let mut agents: Vec<Arc<Mutex<dyn Agent>>> = vec![
697 Arc::new(Mutex::new(stalled1)),
698 Arc::new(Mutex::new(stalled2)),
699 ];
700
701 arbitrator.arbitrate(&ctx, &report, &mut agents);
702
703 for agent_mutex in &agents {
705 let lock = agent_mutex.lock().unwrap();
706 let mock = unsafe { &*((&*lock as *const dyn Agent) as *const MockAgent) };
707 let budget = mock
708 .applied_budget
709 .as_ref()
710 .expect("Budget should be applied");
711 assert_eq!(budget.strategy_id, StrategyId::LowPower);
712 }
713 }
714
715 #[test]
716 fn test_arbitrate_empty_agents() {
717 let arbitrator = create_arbitrator();
718 let ctx = simulation_ctx();
719 let report = normal_report();
720 let mut agents: Vec<Arc<Mutex<dyn Agent>>> = vec![];
721
722 arbitrator.arbitrate(&ctx, &report, &mut agents);
724 }
725
726 #[test]
727 fn test_priority_order_renderer_before_asset_in_simulation() {
728 let arbitrator = create_arbitrator();
729 let ctx = simulation_ctx();
730 let report = normal_report();
731
732 let mut tight_report = report;
735 tight_report.suggested_latency_ms = 10.0;
736
737 let renderer = MockAgent::new(AgentId::Renderer);
738 let asset = MockAgent::new(AgentId::Asset);
739 let mut agents: Vec<Arc<Mutex<dyn Agent>>> =
740 vec![Arc::new(Mutex::new(renderer)), Arc::new(Mutex::new(asset))];
741
742 arbitrator.arbitrate(&ctx, &tight_report, &mut agents);
743
744 let renderer_lock = agents[0].lock().unwrap();
748 let renderer_mock =
749 unsafe { &*((&*renderer_lock as *const dyn Agent) as *const MockAgent) };
750 assert_eq!(
751 renderer_mock.applied_budget.as_ref().unwrap().strategy_id,
752 StrategyId::Balanced
753 );
754 }
755
756 #[test]
757 fn test_background_phase_minimal_priority() {
758 let arbitrator = create_arbitrator();
759 assert!(arbitrator.get_agent_priority(AgentId::Renderer, ExecutionPhase::Background) < 0.2);
760 assert!(arbitrator.get_agent_priority(AgentId::Physics, ExecutionPhase::Background) < 0.2);
761 }
762
763 #[test]
764 fn test_simulation_critical_agents() {
765 let arbitrator = create_arbitrator();
766 assert!(arbitrator.is_critical_agent(AgentId::Renderer, ExecutionPhase::Simulation));
767 assert!(arbitrator.is_critical_agent(AgentId::Physics, ExecutionPhase::Simulation));
768 assert!(arbitrator.is_critical_agent(AgentId::Ecs, ExecutionPhase::Simulation));
769 assert!(!arbitrator.is_critical_agent(AgentId::Audio, ExecutionPhase::Simulation));
770 }
771}