debezium/dbz#2040 feat: add core SyncManager for routing CDC operations#7
Open
KMohnishM wants to merge 3 commits into
Open
debezium/dbz#2040 feat: add core SyncManager for routing CDC operations#7KMohnishM wants to merge 3 commits into
KMohnishM wants to merge 3 commits into
Conversation
7fe87d1 to
c7314b2
Compare
dfbb3b5 to
76fde56
Compare
c7314b2 to
d5eb1f7
Compare
76fde56 to
c15ad5d
Compare
d5eb1f7 to
85f8f5b
Compare
c15ad5d to
ebc1034
Compare
85f8f5b to
f615937
Compare
ebc1034 to
8e320ff
Compare
f615937 to
9af708f
Compare
8e320ff to
7be6bd2
Compare
9af708f to
43611d6
Compare
7be6bd2 to
86d3b29
Compare
43611d6 to
edbff15
Compare
86d3b29 to
c4ad477
Compare
edbff15 to
52fc55a
Compare
c4ad477 to
2498e33
Compare
vjuranek
reviewed
Jun 18, 2026
vjuranek
left a comment
Member
There was a problem hiding this comment.
Two minor comments, otherwise LGTM
| """Thread-safe Dead Letter Queue (DLQ) to hold failed events.""" | ||
|
|
||
| def __init__(self) -> None: | ||
| self._queue: queue.Queue[tuple[DebeziumEventModel, Exception]] = queue.Queue() |
Member
There was a problem hiding this comment.
I'd suggest to have some limit on the queue not to consume infinite amount of memory.
Contributor
Author
There was a problem hiding this comment.
I've added a configurable max_size parameter (defaulting to 1000) to the DeadLetterQueue constructor. If the queue fills up, new failed events are safely dropped with a warning logged to prevent out-of-memory errors on high-volume CDC failure streams. Added a test_dlq_max_size_limit unit test to cover this behavior.
Comment on lines
+173
to
+191
| if self.soft_delete: | ||
| page_content, row_metadata = self.document_builder.projection_policy.project(event) | ||
| system_meta: dict[str, Any] = { | ||
| "_table": event.table_name, | ||
| "_schema": event.schema_name, | ||
| "_op": op, | ||
| "_doc_id": doc_id, | ||
| "_is_deleted": True, | ||
| } | ||
| if event.payload.ts_ms is not None: | ||
| system_meta["_ts_ms"] = event.payload.ts_ms | ||
|
|
||
| metadata = {**row_metadata, **self.document_builder.extra_metadata, **system_meta} | ||
| metadata = self.document_builder._sanitize_metadata(metadata) | ||
|
|
||
| document = Document( | ||
| page_content=page_content, | ||
| metadata=metadata, | ||
| id=doc_id, |
Member
There was a problem hiding this comment.
Posibly move this into DocumentBuilder?
Signed-off-by: Mohnish <kmohnishm@gmail.com>
…iggers Signed-off-by: Mohnish <kmohnishm@gmail.com>
…sanitizer API Signed-off-by: KMohnishM <kmohnishm@gmail.com>
52fc55a to
2f3f212
Compare
2498e33 to
46949ad
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Fixes debezium/dbz#2040
This pull request introduces the core Synchronization Layer (
SyncManager) for Week 3 of the GSoC CDC project. It establishes the event routing engine, retry mechanism, and Dead Letter Queue (DLQ).Key Changes
pydebeziumai/sync/manager.py):c/r)upsertu)deletethenupsert(avoiding dangling metadata or duplicate embeddings)d)delete/ soft-delete update.soft_delete=True, updating metadata with_is_deleted=Trueusing the deleted row'sbeforepayload state.RetryConfigproviding exponential backoff retries with randomized jitter for transient connection errors.DeadLetterQueueto hold failed events when maximum retries are exceeded.tests/unit/test_sync_manager.py): Fully unit-tested the manager class, covering operations routing, soft delete configurations, retry policies (mocking sleep), and DLQ routing.branches: [main]filter from the CI and commit-signoff workflows, allowing these validation checks to run on pull requests targeting stacked branches.Verification Results
All checks passed locally under WSL:
Success: no issues found in 25 source files)SyncManager)