Lightning fast aggregations by distributing DuckDB across AWS Lambda functions

BoilingData
14 min readJan 3, 2023

--

DuckDB is rapidly changing the way data scientists and engineers work. It’s efficient and internally parallelised architecture means that a single querying node often out-competes entire clusters of more traditional query engines.

But out of the box, DuckDB needs to be run on a single node meaning the hardware naturally limits performance. The typical way of solving this in a production environment involves scaling out or scaling up your DuckDB infrastructure: increasing the size of your querying instance or distributing concurrent query jobs to different hardware. Traditionally, scaling out or up are slow and expensive tasks that involve waiting for an upstream provider to allocate resources and start new servers.

The time and cost of scaling DuckDB infrastructure is a good trade-off for many analytical workloads — particularly if you are generating and pre-caching analytics for later consumption. This only works for some businesses in 2023: these days, our users expect more than rigid and overly optimised analytics — they want to dynamically filter, generate trends over long periods, and analyse data in real-time.

Distributing queries across an AWS Lambda DuckDB cluster

Serverless computing presents an opportunity to solve both the cost and cold start problem. AWS Lambda instances are relatively small and underpowered (6 CPU cores, 10GB RAM, max 0.5Gbps network throughput), but have a core benefit of being charged per millisecond and having almost no time/cost overhead when starting, pausing, and resuming instances.

If we can distribute our dataset across 3,000 Lambda instances, data will be downloaded at 1.5Tbps, and we’ll utilise 18,000 CPUs/30TB RAM (10GB Lambdas) to execute queries.

Suddenly, it becomes possible to keep all our data in cold storage and rapidly load it into a serverless DuckDB cluster at query time for a real-time interactive analysis session.

Distributed DuckDB on Lambda vs. single instance DuckDB benchmarks — NYC Yellow Taxi Dataset

To create a benchmark, we used the popular NYC Yellow taxi dataset GZip parquet files from the years 2011–2022 (as they have a common schema). The dataset has about 1.34b rows and is a challenging dataset to run queries over: it is compressed with Gzip, which is not the best algorithm from CPU and network perspective, and the Parquet files have many row groups, which slows down OLAP queries. A more optimised version of the dataset would run faster queries, for instance, with larger row groups and ZSTD compression.

The compressed Parquet files are 18GB across 141 files. This is a total of 118GB uncompressed CSV data (without CSV escapes or hyphens over columns — a 6.56x compression ratio). Downloading the files to the laptop from S3 with an average speed of 30MB/s takes 10 minutes, and to an EC2 instance in the same region as the data takes 50 seconds.

We ran two queries on each system: first, a query that can be easily distributed while combining the results at the end (i.e. COUNT and SUM functions). The second query uses average (AVG), for which we calculate accurate results by running the query over all the contributing rows in one pass. This requires shuffling on the Boiling side, which we have implemented in a network and Lambda-optimised manner. The shuffling is implemented as a generic mechanism to address distributed queries that are not easily combinable.

We wanted these tests to reflect “real world” conditions, so the “Laptop” tests started with the entire dataset already downloaded locally onto the SSD. In contrast, the Boiling and EC2 tests started completely cold and began by downloading data from the object store (S3).

1. Simple combinable query

Laptop with SSD

We ran DuckDB v0.6.1 on OSX (MB Pro 2019, 2.3Ghz, i9 8-core, 16GB DDR4, 1TB SSD).

v0.6.1 919cad22e8
D SELECT COUNT(*) AS total5,
strftime(tpep_pickup_datetime, '%Y-%m-%d') AS pickup4
FROM parquet_scan('*.parquet')
WHERE CAST(strftime(tpep_pickup_datetime, '%Y') AS INTEGER)
BETWEEN 2011 AND 2022
GROUP BY pickup4 ORDER BY pickup4 LIMIT 20;
┌────────┬────────────┐
│ total5 │ pickup4 │
│ int64 │ varchar │
├────────┼────────────┤
│ 183031 │ 2011-01-01 │
│ 158706 │ 2011-01-02 │
│ 374927 │ 2011-01-03 │
│ 407012 │ 2011-01-04 │
│ 439070 │ 2011-01-05 │
│ 476390 │ 2011-01-06 │
│ 486988 │ 2011-01-07 │
│ 500272 │ 2011-01-08 │
│ 451117 │ 2011-01-09 │
│ 449946 │ 2011-01-10 │
│ 471811 │ 2011-01-11 │
│ 361127 │ 2011-01-12 │
│ 488565 │ 2011-01-13 │
│ 512868 │ 2011-01-14 │
│ 485503 │ 2011-01-15 │
│ 446699 │ 2011-01-16 │
│ 386391 │ 2011-01-17 │
│ 442217 │ 2011-01-18 │
│ 477235 │ 2011-01-19 │
│ 501964 │ 2011-01-20 │
├────────┴────────────┤
│ 20 rows 2 columns │
└─────────────────────┘
Run Time (s): real 35.152 user 507.912590 sys 3.608308

DuckDB took 35 seconds to run this query.

EC2 instance (c6a.metal)

The query was run on a c6a.metal instance in the same region as the data. This machine has 192 CPU cores and 384GB RAM, and is the largest instance in the c6 range, coming in at $7.34 per hour.

The initial data download from S3 ran at 368MiB/s and took 50 seconds.

v0.6.1 919cad22e8
D SELECT COUNT(*) AS total5,
strftime(tpep_pickup_datetime, '%Y-%m-%d') AS pickup4
FROM parquet_scan('*.parquet')
WHERE CAST(strftime(tpep_pickup_datetime, '%Y') AS INTEGER)
BETWEEN 2011 AND 2022
GROUP BY pickup4 ORDER BY pickup4 LIMIT 20;
┌────────┬────────────┐
│ total5 │ pickup4 │
│ int64 │ varchar │
├────────┼────────────┤
│ 183031 │ 2011-01-01 │
│ 158706 │ 2011-01-02 │
│ 374927 │ 2011-01-03 │
│ 407012 │ 2011-01-04 │
│ 439070 │ 2011-01-05 │
│ 476390 │ 2011-01-06 │
│ 486988 │ 2011-01-07 │
│ 500272 │ 2011-01-08 │
│ 451117 │ 2011-01-09 │
│ 449946 │ 2011-01-10 │
│ 471811 │ 2011-01-11 │
│ 361127 │ 2011-01-12 │
│ 488565 │ 2011-01-13 │
│ 512868 │ 2011-01-14 │
│ 485503 │ 2011-01-15 │
│ 446699 │ 2011-01-16 │
│ 386391 │ 2011-01-17 │
│ 442217 │ 2011-01-18 │
│ 477235 │ 2011-01-19 │
│ 501964 │ 2011-01-20 │
├────────┴────────────┤
│ 20 rows 2 columns │
└─────────────────────┘
Run Time (s): real 4.065 user 302.831 sys 1.822

The initial query from cold S3 data took 50s + 4s = 54s

BoilingData with 61 Lambdas

The query was run on Boiling, and the query planner chose to run it over 61 Lambdas concurrently. The initial query from all-cold data on S3 took 0.8s + 6.7s = 7.5s. In other words, we didn’t import the existing data in any way or pre-warm any Lambdas before we hit them with the query

We reran the query, using the Lambda cluster that was warmed in the last test. The results were much better: 1.3s — 1.7s. In this query, the column was changed to bypass in-memory result caches.

2. Query with an additional column (avg)

Laptop with SSD

v0.6.1 919cad22e8
D SELECT COUNT(*) AS total5, round(AVG(trip_distance),4) AS adist,
strftime(tpep_pickup_datetime, '%Y-%m-%d') AS pickup4
FROM parquet_scan('*.parquet')
WHERE CAST(strftime(tpep_pickup_datetime, '%Y') AS INTEGER)
BETWEEN 2011 AND 2022
GROUP BY pickup4 ORDER BY pickup4 LIMIT 20;
┌────────┬────────┬────────────┐
│ total5 │ adist │ pickup4 │
│ int64 │ double │ varchar │
├────────┼────────┼────────────┤
│ 183031 │ 3.0448 │ 2011-01-01 │
│ 158706 │ 3.169 │ 2011-01-02 │
│ 374927 │ 2.8845 │ 2011-01-03 │
│ 407012 │ 2.716 │ 2011-01-04 │
│ 439070 │ 2.6061 │ 2011-01-05 │
│ 476390 │ 2.5925 │ 2011-01-06 │
│ 486988 │ 2.5055 │ 2011-01-07 │
│ 500272 │ 2.5656 │ 2011-01-08 │
│ 451117 │ 2.8534 │ 2011-01-09 │
│ 449946 │ 2.6484 │ 2011-01-10 │
│ 471811 │ 2.5189 │ 2011-01-11 │
│ 361127 │ 2.398 │ 2011-01-12 │
│ 488565 │ 2.647 │ 2011-01-13 │
│ 512868 │ 2.6572 │ 2011-01-14 │
│ 485503 │ 2.6565 │ 2011-01-15 │
│ 446699 │ 2.8195 │ 2011-01-16 │
│ 386391 │ 2.8472 │ 2011-01-17 │
│ 442217 │ 2.5776 │ 2011-01-18 │
│ 477235 │ 2.6019 │ 2011-01-19 │
│ 501964 │ 2.6306 │ 2011-01-20 │
├────────┴────────┴────────────┤
│ 20 rows 3 columns │
└──────────────────────────────┘
Run Time (s): real 39.741 user 561.296748 sys 5.281544

DuckDB took 39 seconds to run this query.

EC2 Instance

v0.6.1 919cad22e8
D SELECT COUNT(*) AS total5, round(AVG(trip_distance),4) AS adist,
strftime(tpep_pickup_datetime, '%Y-%m-%d') AS pickup4
FROM parquet_scan('*.parquet')
WHERE CAST(strftime(tpep_pickup_datetime, '%Y') AS INTEGER)
BETWEEN 2011 AND 2022
GROUP BY pickup4 ORDER BY pickup4 LIMIT 20;
┌────────┬────────┬────────────┐
│ total5 │ adist │ pickup4 │
│ int64 │ double │ varchar │
├────────┼────────┼────────────┤
│ 183031 │ 3.0448 │ 2011-01-01 │
│ 158706 │ 3.169 │ 2011-01-02 │
│ 374927 │ 2.8845 │ 2011-01-03 │
│ 407012 │ 2.716 │ 2011-01-04 │
│ 439070 │ 2.6061 │ 2011-01-05 │
│ 476390 │ 2.5925 │ 2011-01-06 │
│ 486988 │ 2.5055 │ 2011-01-07 │
│ 500272 │ 2.5656 │ 2011-01-08 │
│ 451117 │ 2.8534 │ 2011-01-09 │
│ 449946 │ 2.6484 │ 2011-01-10 │
│ 471811 │ 2.5189 │ 2011-01-11 │
│ 361127 │ 2.398 │ 2011-01-12 │
│ 488565 │ 2.647 │ 2011-01-13 │
│ 512868 │ 2.6572 │ 2011-01-14 │
│ 485503 │ 2.6565 │ 2011-01-15 │
│ 446699 │ 2.8195 │ 2011-01-16 │
│ 386391 │ 2.8472 │ 2011-01-17 │
│ 442217 │ 2.5776 │ 2011-01-18 │
│ 477235 │ 2.6019 │ 2011-01-19 │
│ 501964 │ 2.6306 │ 2011-01-20 │
├────────┴────────┴────────────┤
│ 20 rows 3 columns │
└──────────────────────────────┘
Run Time (s): real 4.621 user 325.146 sys 2.284

Initial query from cold S3 data: 50s + 4.6s = 54.6s

BoilingData

Initial query from all cold S3 data: 3.6s + 14s = 17.6s.

BoilingData cold queries incur warming up time when data is loaded in parallel from S3. This query ran shuffling due to the AVG aggregation function.

Further queries on the warmed Lambda cluster took 6.5s — 8s. Boiling routes subsequent queries to Lambdas which already have the data in memory, so the query can be changed to run different aggregations.

BoilingData warm query times are much faster as the data is in an on-demand, dedicated, and distributed in-memory database layer

Cloud managed Presto query times

  SELECT COUNT(*) AS total5, round(AVG(trip_distance),4) AS adist, 
date_format(tpep_pickup_datetime, '%Y-%m-%d') AS pickup4
FROM "boilingdata-benchmark"."nyc6trip_data"
WHERE CAST(date_format(tpep_pickup_datetime, '%Y') AS INTEGER)
BETWEEN 2011 AND 2022
GROUP BY date_format(tpep_pickup_datetime, '%Y-%m-%d')
ORDER BY date_format(tpep_pickup_datetime, '%Y-%m-%d') LIMIT 20;

For comparison, we ran the queries on a popular cloud-managed Presto service. The complex query (having queued for 135ms), had a total runtime of 77s — 124s (data scanned: 5.54GB). The more straightforward (combinable) query queued for 260ms, and had a total query runtime 32–40s (data scanned: 3.85GB).

Summary of test results

Using an instantly scalable serverless DuckDB cluster, it is easy to beat a Laptop as it is bound by SSD, memory, and available cores. With any sized dataset, Boiling will allocate many Lambdas to provide more pure raw CPU power for decompressing the GZip parquet files whilst overcoming the incurred latency from the shuffling and coordination.

The challenging parts are mastering the distributed Lambda compute layer, implementing (and running) a performant shuffle on top of it that actually works, and running subsequent queries over the warmed in-memory data set so that S3 is not touched every single query.

Downloading 18GB from S3 to a laptop takes a long time and S3 egress costs come in at $0.50 — $0.90. Even though downloading is faster on the EC2, 50 seconds is a slow cold start for an interactive dashboarding session, and costly at almost $0.10 using this instance.

Our queries didn’t touch all the columns, so downloading the entire dataset to the machine is wasteful (if all the data is not consumed). However, downloading that data from S3 in an optimal manner with distributed Lambdas achieves high aggregated throughput without AWS egress fees.

Distributed combinable queries are much faster as they don’t require shuffling

From cold, BoilingData is between 3x and 7x faster than querying using an EC2 instance. Once data has been warmed, BoilingData excels in combinable queries (3x faster), whereas EC2 wins the race for more complex queries that require us to shuffle data between Lambdas (1.4x slower).

We wanted to test whether BoilingData can compete with duckdb running on laptop and a top-of-the-line EC2, and even with this small dataset, it can.

Want to learn more about how we distribute queries across Lambda? Check out our latest post:

Redefining SaaS Analytics: Dynamic, Real-time Insights at a Fraction of the Cost

Postscript: Data we didn’t include

In this test, we used DuckDB’s parquet_scan function to query the data files — this function scans through the source data without materialising it into a table, creating indexes, etc. One argument against using parquet_scan is that (in theory) allowing DuckDB to create an optimised table will mean that subsequent queries will run faster.

We tested this on the c6a.metal instance by creating an in-memory DuckDB table and running these queries against it, but in the end, it added a full minute for table creation and didn’t improve the query speed at all:

D CREATE TABLE test AS SELECT * FROM '*.parquet';
100% ▕████████████████████████████████████████████████████████████▏
Run Time (s): real 63.265 user 538.439722 sys 1380.067524

D SELECT COUNT(*) AS total5,
> strftime(tpep_pickup_datetime, '%Y-%m-%d') AS pickup4
> FROM test
> WHERE CAST(strftime(tpep_pickup_datetime, '%Y') AS INTEGER)
> BETWEEN 2011 AND 2022
> GROUP BY pickup4 ORDER BY pickup4 LIMIT 20;
50% ▕██████████████████████████████ ▏ ┌────────┬────────────┐
│ total5 │ pickup4 │
│ int64 │ varchar │
├────────┼────────────┤
│ 183031 │ 2011-01-01 │
│ 158706 │ 2011-01-02 │
│ 374927 │ 2011-01-03 │
│ 407012 │ 2011-01-04 │
│ 439070 │ 2011-01-05 │
│ 476390 │ 2011-01-06 │
│ 486988 │ 2011-01-07 │
│ 500272 │ 2011-01-08 │
│ 451117 │ 2011-01-09 │
│ 449946 │ 2011-01-10 │
│ 471811 │ 2011-01-11 │
│ 361127 │ 2011-01-12 │
│ 488565 │ 2011-01-13 │
│ 512868 │ 2011-01-14 │
│ 485503 │ 2011-01-15 │
│ 446699 │ 2011-01-16 │
│ 386391 │ 2011-01-17 │
│ 442217 │ 2011-01-18 │
│ 477235 │ 2011-01-19 │
│ 501964 │ 2011-01-20 │
├────────┴────────────┤
│ 20 rows 2 columns │
└─────────────────────┘
Run Time (s): real 3.936 user 417.322142 sys 8.984839

D SELECT COUNT(*) AS total5, round(AVG(trip_distance),4) AS adist,
> strftime(tpep_pickup_datetime, '%Y-%m-%d') AS pickup4
> FROM test
> WHERE CAST(strftime(tpep_pickup_datetime, '%Y') AS INTEGER)
> BETWEEN 2011 AND 2022
> GROUP BY pickup4 ORDER BY pickup4 LIMIT 20;
100% ▕████████████████████████████████████████████████████████████▏
┌────────┬────────┬────────────┐
│ total5 │ adist │ pickup4 │
│ int64 │ double │ varchar │
├────────┼────────┼────────────┤
│ 183031 │ 3.0448 │ 2011-01-01 │
│ 158706 │ 3.169 │ 2011-01-02 │
│ 374927 │ 2.8845 │ 2011-01-03 │
│ 407012 │ 2.716 │ 2011-01-04 │
│ 439070 │ 2.6061 │ 2011-01-05 │
│ 476390 │ 2.5925 │ 2011-01-06 │
│ 486988 │ 2.5055 │ 2011-01-07 │
│ 500272 │ 2.5656 │ 2011-01-08 │
│ 451117 │ 2.8534 │ 2011-01-09 │
│ 449946 │ 2.6484 │ 2011-01-10 │
│ 471811 │ 2.5189 │ 2011-01-11 │
│ 361127 │ 2.398 │ 2011-01-12 │
│ 488565 │ 2.647 │ 2011-01-13 │
│ 512868 │ 2.6572 │ 2011-01-14 │
│ 485503 │ 2.6565 │ 2011-01-15 │
│ 446699 │ 2.8195 │ 2011-01-16 │
│ 386391 │ 2.8472 │ 2011-01-17 │
│ 442217 │ 2.5776 │ 2011-01-18 │
│ 477235 │ 2.6019 │ 2011-01-19 │
│ 501964 │ 2.6306 │ 2011-01-20 │
├────────┴────────┴────────────┤
│ 20 rows 3 columns │
└──────────────────────────────┘
Run Time (s): real 6.451 user 442.361772 sys 544.996610

In this article we downloaded the entire dataset to the EC2, and queried the local copy — this added 50 seconds to the warm up time. Another potential way that DuckDB on EC2 could have been faster is by parquet_scanning S3 directly (because parquet_scan only accesses the relevant byte ranges of the source data, not the whole dataset).

We tested this, and while the initial query would have been ~30 seconds faster, it meant that the subsequent query would have been ~25 seconds slower. On balance, no massive performance improvement (and in fact, it is much slower if you wanted to run three or more queries on this dataset):

D SELECT COUNT(*) AS total5,
> strftime(tpep_pickup_datetime, '%Y-%m-%d') AS pickup4
> FROM parquet_scan('s3://isecurefi-dev-test/nyc-tlc/trip_data/*')
> WHERE CAST(strftime(tpep_pickup_datetime, '%Y') AS INTEGER)
> BETWEEN 2011 AND 2022
> GROUP BY pickup4 ORDER BY pickup4 LIMIT 20;
WHERE CAST(strftime(tpep_pickup_datetime, '%Y') AS INTEGER)
BETWEEN 2011 AND 2022
GROUP BY pickup4 ORDER BY pickup4 LIMIT 20;
100% ▕████████████████████████████████████████████████████████████▏
┌────────┬────────────┐
│ total5 │ pickup4 │
│ int64 │ varchar │
├────────┼────────────┤
│ 183031 │ 2011-01-01 │
│ 158706 │ 2011-01-02 │
│ 374927 │ 2011-01-03 │
│ 407012 │ 2011-01-04 │
│ 439070 │ 2011-01-05 │
│ 476390 │ 2011-01-06 │
│ 486988 │ 2011-01-07 │
│ 500272 │ 2011-01-08 │
│ 451117 │ 2011-01-09 │
│ 449946 │ 2011-01-10 │
│ 471811 │ 2011-01-11 │
│ 361127 │ 2011-01-12 │
│ 488565 │ 2011-01-13 │
│ 512868 │ 2011-01-14 │
│ 485503 │ 2011-01-15 │
│ 446699 │ 2011-01-16 │
│ 386391 │ 2011-01-17 │
│ 442217 │ 2011-01-18 │
│ 477235 │ 2011-01-19 │
│ 501964 │ 2011-01-20 │
├────────┴────────────┤
│ 20 rows 2 columns │
└─────────────────────┘
Run Time (s): real 29.428 user 244.316008 sys 8.996392

D SELECT COUNT(*) AS total5, round(AVG(trip_distance),4) AS adist,
> strftime(tpep_pickup_datetime, '%Y-%m-%d') AS pickup4
> FROM parquet_scan('s3://isecurefi-dev-test/nyc-tlc/trip_data/*')
> WHERE CAST(strftime(tpep_pickup_datetime, '%Y') AS INTEGER)
> BETWEEN 2011 AND 2022
> GROUP BY pickup4 ORDER BY pickup4 LIMIT 20;
100% ▕████████████████████████████████████████████████████████████▏
┌────────┬────────┬────────────┐
│ total5 │ adist │ pickup4 │
│ int64 │ double │ varchar │
├────────┼────────┼────────────┤
│ 183031 │ 3.0448 │ 2011-01-01 │
│ 158706 │ 3.169 │ 2011-01-02 │
│ 374927 │ 2.8845 │ 2011-01-03 │
│ 407012 │ 2.716 │ 2011-01-04 │
│ 439070 │ 2.6061 │ 2011-01-05 │
│ 476390 │ 2.5925 │ 2011-01-06 │
│ 486988 │ 2.5055 │ 2011-01-07 │
│ 500272 │ 2.5656 │ 2011-01-08 │
│ 451117 │ 2.8534 │ 2011-01-09 │
│ 449946 │ 2.6484 │ 2011-01-10 │
│ 471811 │ 2.5189 │ 2011-01-11 │
│ 361127 │ 2.398 │ 2011-01-12 │
│ 488565 │ 2.647 │ 2011-01-13 │
│ 512868 │ 2.6572 │ 2011-01-14 │
│ 485503 │ 2.6565 │ 2011-01-15 │
│ 446699 │ 2.8195 │ 2011-01-16 │
│ 386391 │ 2.8472 │ 2011-01-17 │
│ 442217 │ 2.5776 │ 2011-01-18 │
│ 477235 │ 2.6019 │ 2011-01-19 │
│ 501964 │ 2.6306 │ 2011-01-20 │
├────────┴────────┴────────────┤
│ 20 rows 3 columns │
└──────────────────────────────┘
Run Time (s): real 28.016 user 265.048058 sys 10.144067

--

--

BoilingData
BoilingData

Written by BoilingData

Scalable on-the-fly analytics from cold data (SQL), easy apps (code), and hot services (APIs). Where data, apps, and services merge!

Responses (4)