khora_agents/ecs_agent/
garbage_collector_agent.rs1use std::collections::VecDeque;
22use std::time::Duration;
23
24use crossbeam_channel::Sender;
25use khora_core::agent::Agent;
26use khora_core::control::gorna::{
27 AgentId, AgentStatus, NegotiationRequest, NegotiationResponse, ResourceBudget, StrategyId,
28 StrategyOption,
29};
30use khora_core::telemetry::event::TelemetryEvent;
31use khora_data::ecs::{PageIndex, SemanticDomain, World, WorldMaintenance};
32use khora_lanes::ecs_lane::{CompactionLane, GcWorkPlan};
33
34const DEFAULT_MAX_CLEANUP_PER_FRAME: usize = 10;
35const HIGH_PERFORMANCE_CLEANUP_MULTIPLIER: usize = 3;
36const LOW_POWER_CLEANUP_DIVISOR: usize = 4;
37
38pub struct GarbageCollectorAgent {
40 pending_cleanup: VecDeque<(PageIndex, SemanticDomain)>,
41 pending_vacuum: VecDeque<(u32, u32)>,
42 compaction_lane: CompactionLane,
43 current_strategy: StrategyId,
44 max_cleanup_per_frame: usize,
45 last_cleanup_count: usize,
46 frame_count: u64,
47 telemetry_sender: Option<Sender<TelemetryEvent>>,
48}
49
50impl Agent for GarbageCollectorAgent {
51 fn id(&self) -> AgentId {
52 AgentId::Ecs
53 }
54
55 fn negotiate(&mut self, _request: NegotiationRequest) -> NegotiationResponse {
56 let pending_count = self.pending_cleanup.len() + self.pending_vacuum.len();
57 let urgency_factor = (pending_count as f32 / 100.0).min(5.0);
58
59 NegotiationResponse {
60 strategies: vec![
61 StrategyOption {
62 id: StrategyId::LowPower,
63 estimated_time: Duration::from_micros(50),
64 estimated_vram: 0,
65 },
66 StrategyOption {
67 id: StrategyId::Balanced,
68 estimated_time: Duration::from_micros((100.0 * (1.0 + urgency_factor)) as u64),
69 estimated_vram: 0,
70 },
71 StrategyOption {
72 id: StrategyId::HighPerformance,
73 estimated_time: Duration::from_micros((500.0 * (1.0 + urgency_factor)) as u64),
74 estimated_vram: 0,
75 },
76 ],
77 }
78 }
79
80 fn apply_budget(&mut self, budget: ResourceBudget) {
81 log::info!(
82 "GarbageCollectorAgent: Strategy update to {:?}",
83 budget.strategy_id,
84 );
85
86 self.current_strategy = budget.strategy_id;
87
88 self.max_cleanup_per_frame = match budget.strategy_id {
89 StrategyId::LowPower => {
90 (DEFAULT_MAX_CLEANUP_PER_FRAME / LOW_POWER_CLEANUP_DIVISOR).max(1)
91 }
92 StrategyId::Balanced => DEFAULT_MAX_CLEANUP_PER_FRAME,
93 StrategyId::HighPerformance => {
94 DEFAULT_MAX_CLEANUP_PER_FRAME * HIGH_PERFORMANCE_CLEANUP_MULTIPLIER
95 }
96 StrategyId::Custom(factor) => (factor as usize).clamp(1, 100),
97 };
98 }
99
100 fn update(&mut self, context: &mut khora_core::EngineContext<'_>) {
101 if let Some(world_any) = context.world.as_deref_mut() {
102 if let Some(world) = world_any.downcast_mut::<World>() {
103 self.run(world);
104 }
105 }
106 self.frame_count += 1;
107 }
108
109 fn report_status(&self) -> AgentStatus {
110 let pending = self.pending_cleanup.len() + self.pending_vacuum.len();
111 let health_score = if pending == 0 {
112 1.0
113 } else if pending < 100 {
114 0.8
115 } else if pending < 500 {
116 0.5
117 } else {
118 0.2
119 };
120
121 AgentStatus {
122 agent_id: self.id(),
123 health_score,
124 current_strategy: self.current_strategy,
125 is_stalled: false,
126 message: format!(
127 "pending_cleanup={} pending_vacuum={} last_cleaned={}",
128 self.pending_cleanup.len(),
129 self.pending_vacuum.len(),
130 self.last_cleanup_count,
131 ),
132 }
133 }
134
135 fn execute(&mut self) {
136 }
138
139 fn as_any(&self) -> &dyn std::any::Any {
140 self
141 }
142
143 fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
144 self
145 }
146}
147
148impl GarbageCollectorAgent {
149 pub fn new() -> Self {
151 Self {
152 pending_cleanup: VecDeque::new(),
153 pending_vacuum: VecDeque::new(),
154 compaction_lane: CompactionLane::new(),
155 current_strategy: StrategyId::Balanced,
156 max_cleanup_per_frame: DEFAULT_MAX_CLEANUP_PER_FRAME,
157 last_cleanup_count: 0,
158 frame_count: 0,
159 telemetry_sender: None,
160 }
161 }
162
163 pub fn with_dcc_sender(mut self, sender: Sender<TelemetryEvent>) -> Self {
165 self.telemetry_sender = Some(sender);
166 self
167 }
168
169 pub fn queue_cleanup(&mut self, page_index: PageIndex, domain: SemanticDomain) {
171 self.pending_cleanup.push_back((page_index, domain));
172 }
173
174 pub fn queue_vacuum(&mut self, page_index: u32, hole_row_index: u32) {
176 self.pending_vacuum.push_back((page_index, hole_row_index));
177 }
178
179 pub fn run(&mut self, world: &mut World) {
181 if self.pending_cleanup.is_empty() && self.pending_vacuum.is_empty() {
182 self.last_cleanup_count = 0;
183 return;
184 }
185
186 let budget = self.max_cleanup_per_frame;
187
188 let items_to_clean: Vec<_> = self
189 .pending_cleanup
190 .drain(..budget.min(self.pending_cleanup.len()))
191 .collect();
192
193 let pages_to_vacuum: Vec<_> = self
194 .pending_vacuum
195 .drain(..budget.min(self.pending_vacuum.len()))
196 .collect();
197
198 let items_count = items_to_clean.len();
199 let pages_count = pages_to_vacuum.len();
200 self.last_cleanup_count = items_count + pages_count;
201
202 if self.last_cleanup_count == 0 {
203 return;
204 }
205
206 let work_plan = GcWorkPlan {
207 budget,
208 items_to_clean,
209 pages_to_vacuum,
210 };
211
212 self.compaction_lane
213 .run(world as &mut dyn WorldMaintenance, &work_plan);
214
215 log::trace!(
216 "GarbageCollectorAgent: Cleaned {} items, {} pages vacuumed (strategy={:?})",
217 items_count,
218 pages_count,
219 self.current_strategy,
220 );
221 }
222
223 pub fn pending_count(&self) -> usize {
225 self.pending_cleanup.len() + self.pending_vacuum.len()
226 }
227
228 pub fn current_strategy(&self) -> StrategyId {
230 self.current_strategy
231 }
232
233 pub fn max_cleanup_per_frame(&self) -> usize {
235 self.max_cleanup_per_frame
236 }
237}
238
239impl Default for GarbageCollectorAgent {
240 fn default() -> Self {
241 Self::new()
242 }
243}