From d003c6f4823d73dfc77f924bdade36ebbefcfa36 Mon Sep 17 00:00:00 2001 From: Ben Pfaff Date: Thu, 4 Jun 2026 08:16:20 -0700 Subject: [PATCH] [dbsp] Use serde_json to serialize data for Broadcast operator. The Broadcast operator was using rmp_serde to serialize and deserialize data for exchange across hosts, for reasons that have been lost to the mists of time. This serialization corrupted some of our data; for example, it caused an array of integers to become an array of array of stringified integers (!). This commit fixes it by using serde_json instead of rmp_serde. Since this was the only use of rmp_serde, it drops that dependency. Thanks to @ryzhyk for debugging help. Signed-off-by: Ben Pfaff --- Cargo.lock | 1 - crates/dbsp/Cargo.toml | 1 - crates/dbsp/src/circuit/runtime.rs | 4 ++-- 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 00f93d5d843..ab345bd088e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4014,7 +4014,6 @@ dependencies = [ "rand_xoshiro", "reqwest 0.12.24", "rkyv", - "rmp-serde", "roaring", "seq-macro", "serde", diff --git a/crates/dbsp/Cargo.toml b/crates/dbsp/Cargo.toml index 76785dab65d..d820af34634 100644 --- a/crates/dbsp/Cargo.toml +++ b/crates/dbsp/Cargo.toml @@ -95,7 +95,6 @@ feldera-ir = { workspace = true } smallvec = { workspace = true } async-stream = { workspace = true } futures-util = { workspace = true } -rmp-serde = { workspace = true } feldera-buffer-cache = { workspace = true } memory-stats = { workspace = true } serde_json_path_to_error = { workspace = true } diff --git a/crates/dbsp/src/circuit/runtime.rs b/crates/dbsp/src/circuit/runtime.rs index db57e918a28..766268bc267 100644 --- a/crates/dbsp/src/circuit/runtime.rs +++ b/crates/dbsp/src/circuit/runtime.rs @@ -1413,13 +1413,13 @@ where exchange .send_all_with_serializer(identifier, repeat(local.clone()), |local| { let mut fbuf = FBuf::new(); - rmp_serde::encode::write(&mut fbuf, &local).unwrap(); + serde_json::to_writer(&mut fbuf, &local).unwrap(); fbuf }) .await; exchange - .receive_all(|data| rmp_serde::from_slice(&data).unwrap()) + .receive_all(|data| serde_json::from_slice(&data).unwrap()) .await }) .await