Add server ingestion OOM protection#18784
Conversation
6c03ba0 to
017ec1e
Compare
There was a problem hiding this comment.
Pull request overview
Adds server-local OOM protection/backpressure for realtime ingestion by sampling JVM heap usage and temporarily pausing stream fetches when usage exceeds configurable thresholds. This integrates into the server realtime consume loop, adds server + table override configs, and exposes new table-level metrics for observability.
Changes:
- Introduces
ServerIngestionOomProtectionManagerand wires it into realtime consumption (gated inINITIAL_CONSUMINGonly). - Adds server instance config keys + table-level override config (
serverIngestionOomProtectionConfig) with validation + SerDe/tests. - Adds new server metrics (gauges + meter) and updates the upsert realtime example docs/config.
Reviewed changes
Copilot reviewed 15 out of 15 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| pinot-tools/src/main/resources/examples/stream/upsertMeetupRsvp/upsertMeetupRsvp_realtime_table_config.json | Adds table-level override example for server ingestion OOM protection. |
| pinot-tools/src/main/resources/examples/stream/upsertMeetupRsvp/README.md | Documents server + table configs and behavior for the example. |
| pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java | Adds server instance config keys/defaults for OOM protection. |
| pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java | Adds table-level override field for OOM protection in stream ingestion config. |
| pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/ServerIngestionOomProtectionConfig.java | New table-level config object for OOM protection mode/threshold overrides. |
| pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java | Validates table-level OOM protection thresholds in ingestion config validation. |
| pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java | Adds validation tests for OOM protection config constraints. |
| pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/ServerIngestionOomProtectionManager.java | New manager implementing heap-sampling + hysteresis + metrics/logging. |
| pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java | Creates and resets the OOM protection manager per realtime table. |
| pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java | Applies protection gating in consume loop for INITIAL_CONSUMING segments. |
| pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/ServerIngestionOomProtectionManagerTest.java | Adds unit tests for policy/override behavior and wait/metrics paths. |
| pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java | Adds integration-style tests asserting gating is applied/skipped by state. |
| pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java | Adds new table gauges for protection active + heap usage percent. |
| pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java | Adds new table meter for throttling occurrences. |
| pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeUtilsTest.java | Adds SerDe assertions for the new table-level config object. |
cb580c4 to
a230375
Compare
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #18784 +/- ##
============================================
+ Coverage 57.00% 64.82% +7.81%
- Complexity 1 1319 +1318
============================================
Files 2599 3390 +791
Lines 151667 210435 +58768
Branches 24564 32986 +8422
============================================
+ Hits 86459 136413 +49954
- Misses 57884 63026 +5142
- Partials 7324 10996 +3672
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
2264fdc to
1924588
Compare
1924588 to
ecb7f10
Compare
95da13e to
f32d50b
Compare
f32d50b to
bd76b67
Compare
xiangfu0
left a comment
There was a problem hiding this comment.
Found one high-signal issue; see inline comment.
b358136 to
f20a57f
Compare
f20a57f to
261dce0
Compare
| @Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache, | ||
| BooleanSupplier isServerReadyToConsumeData, | ||
| BooleanSupplier isServerReadyToServeQueries, | ||
| ServerIngestionOomProtectionManager.ServerThrottleState serverIngestionOomProtectionThrottleState, |
There was a problem hiding this comment.
Same compatibility issue here for pinot.server.instance.table.data.manager.provider.class: adding the new parameter to the interface method breaks existing custom providers at runtime. Pinot already supports external provider implementations via config, so this needs a compatibility shim rather than a signature rewrite.
| @Nullable SegmentOperationsThrottlerSet segmentOperationsThrottlerSet, | ||
| ServerReloadJobStatusCache reloadJobStatusCache) | ||
| ServerReloadJobStatusCache reloadJobStatusCache, | ||
| ServerIngestionOomProtectionManager.ServerThrottleState serverIngestionOomProtectionThrottleState) |
There was a problem hiding this comment.
This widens an existing extension-point signature used by pinot.server.instance.data.manager.class. Any custom InstanceDataManager compiled against the current release will still load, but the first init(...) call on an upgraded server will hit AbstractMethodError because the old 5-arg method no longer satisfies the interface. Please preserve the old signature and thread the throttle state through a backward-compatible overload or setter instead of rewriting the contract in place.
Summary
This PR adds server ingestion OOM protection for realtime ingestion. When protection is enabled for a consuming
segment, the server checks JVM heap usage before fetching more realtime messages. If heap usage is at or above the
configured throttle threshold, the consuming loop waits locally and resumes after heap usage drops to the recovery
threshold.
The protection uses Pinot's existing JVM heap usage tracking (
ResourceUsageUtils, backed byMemoryMXBean), which isthe same process-level heap source used by query OOM protection/accounting. It adds a separate realtime-ingestion
backpressure gate on top of that shared heap signal; it does not reuse per-query attribution or query kill logic.
The implementation keeps heap sampling, hysteresis, GC requests, and metrics in a single server-wide throttle state.
Each consuming thread only checks whether its table is enrolled and then reads the shared server throttle status, so the
per-table work stays light.
Default Behavior
With no config changes, Pinot servers do not hold or stop realtime ingestion for this feature.
Effective defaults:
DISABLE0.950.901000 ms30000 msDefault runtime behavior:
DISABLEor a table explicitly setsmodetoENABLE.INITIAL_CONSUMING; catch-up paths such asCATCHING_UPandCONSUMING_TO_ONLINEare not gated.ingestion-heavy server staying paused only because reclaimable objects are still counted as used heap.
mode=ENABLEopts a table in even when the server mode isDISABLEor would otherwise skip the table.mode=DISABLEopts a table out even when the server mode would protect it.Server Configuration
Server properties use the user-facing prefix
pinot.server.instance.. They can be supplied in the server instance configat startup, and they are also dynamic cluster configs: updating the same keys through the cluster config updates the
shared server throttle state at runtime without a server restart.
Cluster config values override the boot-time server instance config while present. Removing a cluster config key falls
back to the boot-time instance config value, or to the default if the instance config did not set it.
Enable Upsert And Dedup Protection
Update Through Cluster Config
Use the same full
pinot.server.instance.*names when updating cluster config:To disable the server-level policy dynamically:
Server Config Reference
pinot.server.instance.ingestion.oom.protection.modeDISABLEENABLE,UPSERT_DEDUP_ONLY,DISABLE.pinot.server.instance.ingestion.oom.protection.heapUsageThrottleThreshold0.95pinot.server.instance.ingestion.oom.protection.heapUsageRecoveryThreshold0.90heapUsageThrottleThreshold.pinot.server.instance.ingestion.oom.protection.checkIntervalMs1000pinot.server.instance.ingestion.oom.protection.gcIntervalMs300000or a negative value to disable the explicit GC request.Server Modes
Tune Server-Wide Thresholds
Thresholds are server-level only. They are intentionally not table-level knobs.
Table Configuration
Tables can only opt in or opt out under
ingestionConfig.streamIngestionConfig.oomProtectionConfig. Thresholds areconfigured on the server and shared by all enrolled realtime consuming threads.
If table
modeis unset, the table follows the server mode. Tablemodesupports onlyENABLEandDISABLE:ENABLE: protect this realtime table even when the server mode isDISABLEor would otherwise skip it.DISABLE: turn protection off for this realtime table even when the server mode would protect it.Table Config Reference
modeENABLE,DISABLE. Unset follows the server mode.Force Protection On For One Realtime Table
Use this to opt in a specific realtime table, including when the server mode is
DISABLEorUPSERT_DEDUP_ONLYwouldskip a non-upsert/non-dedup table.
{ "ingestionConfig": { "streamIngestionConfig": { "streamConfigMaps": [ { "streamType": "kafka" } ], "oomProtectionConfig": { "mode": "ENABLE" } } } }Disable Protection For One Table
Runtime Behavior
When protection is active for a consuming segment, the server waits before the next stream fetch. This reduces additional
heap pressure from realtime ingestion while leaving segment state, stream offsets, and table pause state unchanged.
The heap sample is the same process-level JVM heap usage used by query OOM protection. The ingestion feature does not
reuse per-query memory attribution or query kill logic; it only uses the shared heap usage signal to decide whether
realtime ingestion should wait.
Heap sampling, hysteresis, GC requests, and the active/heap gauges are tracked centrally per server. A consuming thread
does not compute table-specific thresholds. It checks local enrollment (
modeplus server policy) and then observes theshared server throttle state.
While throttled, Pinot requests JVM GC at a server-wide, rate-limited interval. This matters for ingestion-heavy servers:
once ingestion stops allocating, the JVM might not naturally trigger another collection immediately, so the reported used
heap can stay above the recovery threshold even when garbage is reclaimable. The explicit GC request is a JVM hint, just
like the query OOM pause path; JVM flags such as
-XX:+DisableExplicitGCcan still ignore it.While waiting, the consume loop re-checks stop and segment end criteria at each interval, so force commit, time limit,
and stop paths do not wait indefinitely for heap recovery.
Metrics
This PR adds server metrics for visibility:
REALTIME_INGESTION_OOM_PROTECTION_ACTIVE: global gauge set to1when the server-wide realtime ingestion throttleis active, otherwise
0.REALTIME_INGESTION_OOM_PROTECTION_HEAP_USAGE_PERCENT: global gauge containing the latest JVM heap usage percentageobserved by the shared server throttle state.
Example Updated
The upsert meetup RSVP realtime example now includes a table opt-in sample and a short operator guide:
pinot-tools/src/main/resources/examples/stream/upsertMeetupRsvp/upsertMeetupRsvp_realtime_table_config.jsonpinot-tools/src/main/resources/examples/stream/upsertMeetupRsvp/README.mdTesting
./mvnw -pl pinot-core -am -Dtest=ServerIngestionOomProtectionManagerTest -Dsurefire.failIfNoSpecifiedTests=false test./mvnw -pl pinot-core,pinot-server -am -Dtest=ServerIngestionOomProtectionManagerTest,RealtimeSegmentDataManagerTest -Dsurefire.failIfNoSpecifiedTests=false test./mvnw -pl pinot-segment-local,pinot-common,pinot-core -am -Dtest=TableConfigUtilsTest,TableConfigSerDeUtilsTest,RealtimeSegmentDataManagerTest -Dsurefire.failIfNoSpecifiedTests=false test./mvnw -pl pinot-plugins/pinot-metrics/pinot-yammer -am -Dtest=YammerServerPrometheusMetricsTest -Dsurefire.failIfNoSpecifiedTests=false test./mvnw spotless:apply -pl pinot-core,pinot-server,pinot-tools./mvnw license:format -pl pinot-core,pinot-server,pinot-tools./mvnw checkstyle:check -pl pinot-core,pinot-server,pinot-tools./mvnw license:check -pl pinot-core,pinot-server,pinot-toolsgit diff --check