DreamDBv0.2.0bec026

Sharded Ingest via Branch+Union-Merge

Status (2026-05-18): this document is normative for the v0 reference implementation of MergeStrategy::UnionTracks (the fused-merge approach). The protocol-level framing — that layered-merge and fused-merge are both valid resolutions for multi-parent Manifests — is in spec/0008 §5.3. A future revision MAY promote the algorithm here to its own spec number.

2026-05-18. Implementing B2 from design/0006-10b-scale-blockers.md. Other six blockers (B1, B5, B3, B4, B8, B6) have shipped; B2 is the last execution-side blocker before 10B-scale is structurally ready. This doc captures the algorithm so the implementation slice can run end-to-end.

Problem

A single-process Dataset::append_many ingests ~120 CLIP samples/second (image encode dominates). 10B images at that rate = 99 days. 100B = 990 days. Unacceptable.

The fix is the standard data-parallel pattern: N workers each ingest a disjoint slice into a personal branch; orchestrator merges branches back into trunk. The DreamDB Protocol's content-addressing makes this safe (no double-write conflicts) but it doesn't free the operator from implementing the merge correctly.

Why FastForward isn't sufficient

The existing Dataset::merge(branch, MergeStrategy::FastForward) works when the branch is a strict descendant of trunk. For sharded ingest:

                            T_w0  (worker 0 appended slice 0)
                          /
          trunk@T0       /T_w1   (worker 1 appended slice 1)
                  \____ /
                       \T_w2   (worker 2 appended slice 2)

After fork, every branch IS a descendant of trunk@T0 → first FF works:

trunk@T0 → FF(w0) → trunk@T_w0

But now trunk is at T_w0; w1's parent is T0, not T_w0. The second FF fails because w1 is NOT a descendant of T_w0 (it's a sibling). This is the diverged-history case that requires real merge.

The merge algorithm (3-way union, single branch)

Given:

  • Trunk's current Manifest T_self.
  • Branch's current Manifest T_other.
  • LCA (lowest common ancestor) T_lca, found by walking parents[0] on both sides and finding the first hash in both chains.

For each TrackEntry in each Manifest's tracks:

  1. Find the corresponding TrackEntry in T_self, T_other, T_lca (same (timeline, modality) key).
  2. Refuse if any modality is present in one side and absent in the other (schema divergence — out of scope for v0).
  3. Refuse if the modality's SpatialIndex hash differs across the three (catastrophic per project_collab_disciplines.md).
  4. Walk the three Tracks' object_index:

For SpatialBucket tracks:

  • Build cells: HashMap<SpatialKey, (lca_entry, self_entry, other_entry)> from the union of all three.
  • For each cell key k:
    • If self_entry.bucket_address == lca_entry.bucket_address AND other_entry.bucket_address == lca_entry.bucket_address → cell unchanged everywhere. Use LCA's entry.
    • If self_entry.bucket_address == lca_entry.bucket_address only → branch touched this cell. Use other_entry.
    • If other_entry.bucket_address == lca_entry.bucket_address only → trunk touched this cell. Use self_entry.
    • Both differ from LCA → merge buckets (see below).
  • Emit a new TrackEntry with the merged cells.

For Fragment tracks:

  • Concatenate entries from self.entries.diff(lca) and other.entries.diff(lca). Sort by t_start. No record-level merge needed — fragments are independent.

For ScalarBucket tracks:

  • Per scalar value, union the anchor lists from each side's bucket; write a new bucket if both sides modified the same value's bucket; otherwise use whichever side touched it.

For Constant tracks:

  • Refuse if the Constant Objects differ (operator must pick a winner explicitly).

Bucket-merge subroutine (the actually-novel piece)

For a cell k modified on both sides:

  1. Fetch bucket_self = SpatialBucket::from_bytes(connector.get(self_entry.bucket_address)?).
  2. Fetch bucket_other = SpatialBucket::from_bytes(connector.get(other_entry.bucket_address)?).
  3. Fetch bucket_lca = SpatialBucket::from_bytes(connector.get(lca_entry.bucket_address)?).
  4. Reuse append_many's existing consolidation logic (line ~1100-1250 of dataset.rs):
    • Records in bucket_self ∪ bucket_other (deduplicated by time_anchor — equality means literally the same Item appended twice; refuse loudly because that means the slice assignment was wrong).
    • Write the merged bucket as a new Object.
  5. Build new SpatialBucketEntry with the merged bucket's hash.

Manifest construction

new_manifest = Manifest {
    parents: vec![T_self, T_other],
    timelines: ..., // inherit from T_self
    tracks: TracksField::Inline(new_track_entries),
    ts: now_unix_ns(),
    writer: "dreamdb-dataset/0.1#trunk#union-merge",
    registry: T_self.registry.clone(),  // assumes Schema unchanged across branches
}

Multi-parent parents is a single load-bearing CBOR change — already supported by the Manifest decoder (see spec/0008).

Refusal scenarios

The v0 union-merge MUST refuse loudly (not silently re-encode) in these cases:

  1. Modality present in one Manifest but absent in another.
  2. SpatialIndex hash differs across (self, other, lca) for any modality.
  3. A bucket-merge subroutine finds two records with the same time_anchor (slice-assignment was wrong).
  4. The LCA walk doesn't terminate within 1000 steps in either direction.
  5. A non-SpatialBucket Track Object has diverged in an unsupported way (e.g., Constant differs).

Multi-branch merge: Dataset::merge_many(branches: &[&str])

For N parallel ingest branches, the operator wants one atomic merge that captures all N at once. The single-branch algorithm above generalizes:

  • LCA: must be common across all (N+1) tips. Walk every chain; intersect ancestor sets.
  • Per cell, the conflict set is {branch_i : branch_i_bucket_addr ≠ lca_bucket_addr}. Merge all buckets in the conflict set in one shot.
  • Manifest: parents: vec![T_self, T_b0, T_b1, ..., T_b{N-1}].

Cost analysis at 10B records / N=64 branches:

  • LCA walk: 64 chain walks × ~3 steps each = trivial.
  • Per cell: average N=10 branches modify it. 16.7M cells × 10 GETs = 167M GETs. With buffer_unordered(256) and ~5ms/GET = ~57 minutes.
  • Total wall-clock: ~1 hour for the merge phase, dwarfed by the 6+ days of parallel CLIP inference it enables.

This is the actual 10B-scale benefit: ingest goes from 99 days serial to ~3.5 days parallel (N=64) + 1 hour merge.

Operator pattern (k8s)

yaml
apiVersion: batch/v1
kind: Job
metadata:
  name: dreamdb-ingest-w
spec:
  parallelism: 64
  completions: 64
  completionMode: Indexed
  template:
    spec:
      containers:
        - name: worker
          image: dreamdb-ingest:latest
          command:
            - python
            - -m
            - my.ingest
            - --branch=ingest-w-$JOB_COMPLETION_INDEX
            - --slice=$JOB_COMPLETION_INDEX/64
            - --ref-source=trunk
---
apiVersion: batch/v1
kind: Job
metadata:
  name: dreamdb-ingest-merge
spec:
  template:
    spec:
      containers:
        - name: orchestrator
          image: dreamdb-cli:latest
          command:
            - dreamdb
            - merge-many
            - --ref=trunk
            - --backend=$BACKEND
            - --branches=ingest-w-0,ingest-w-1,...,ingest-w-63

The merge orchestrator is single-pod (one CAS at the end). The 1 hour merge time is acceptable for a 3.5-day ingest cycle.

Implementation slice (shipped 2026-05-18)

File                                       Diff
─────────────────────────────────────────────────
dreamdb-dataset/src/dataset/merge.rs        +690 (union_merge_tracks, find_lca, merge_diverged_track, merge_two_buckets, refresh_field_tracks_from_current)
dreamdb-dataset/src/dataset.rs              +20  (MergeStrategy::UnionTracks variant + mod merge declaration)
dreamdb-cli/src/main.rs                     +60  (`merge-many` command)
dreamdb-dataset/tests/create_open.rs        +120 (3 tests: union_merge_combines_disjoint_branch_appends, union_merge_no_op_when_same_tip, merge_many_combines_three_branches)
spec/0008 §5.3                             +30  (protocol-level framing: layered-merge vs fused-merge)

Protocol formalization (the layered-vs-fused distinction) lives in spec/0008 §5.3. The full algorithm above is normative for v0 implementations of fused-merge; a future revision MAY promote it to its own spec number.

Status

Designed + implemented (2026-05-18). Shipped:

  • MergeStrategy::UnionTracks in dreamdb-dataset — single-branch 3-way union-merge with the algorithm described above. Refusal scenarios are wired (1, 2, 3 above; 4 is bounded at 1000-step LCA walk; 5 is the paged/non-SpatialBucket case).
  • Dataset::merge_many(branches: &[&str]) — sequential wrapper.
  • dreamdb merge-many --backend ... --ref-name <trunk> <branches>... CLI.
  • 3 integration tests: disjoint-branch combine, no-op same-tip, three-branch chain.
  • Bug fix discovered en route: post-FF field_tracks wasn't being refreshed → reads returned stale pre-merge results. Dataset::refresh_field_tracks_from_current() now runs after both FF and UnionTracks.

What remains as follow-up (not on the 10B-blocker critical path):

  • Single-Manifest (N+1)-parent batched merge (one fewer Manifest + CAS per merge run; sequential is fine for now).
  • Fragment / Scalar / Constant union-merge (currently refuses non-embedding diverged tracks).
  • Promotion of this design doc to a full spec/00XX (the algorithm is already normative; just hasn't been assigned a spec number).
  • k8s YAML example in dreamdb-cli/examples/sharded-ingest.yaml.