~/articles/design-top-k-heavy-hitters
◆◆◆Advancedasked at Twitterasked at Metaasked at Googleasked at Amazon

Design Top-K / Trending (heavy hitters)

Find the top-K most frequent items in a massive stream without counting everything exactly. Count-Min Sketch, heavy-hitter algorithms, and approximate streaming aggregation.

22 min read2026-04-21Ironclad Academy
// DEPTH
the full breakdown — requirements, capacity, evolution, trade-offs

The problem

Twitter's trending sidebar, YouTube's "trending" tab, Amazon's hourly best-sellers — each of these answers the same deceptively simple question: of the billions of events flowing through the system right now, which K items are appearing most often? Twitter processes hundreds of millions of tweets per day; a hashtag can go from zero to global trending in under a minute. The system has to notice that, rank it, and surface it — continuously, in near-real-time, across every region.

The naive approach is a hashmap: one counter per item, increment on every event. At Twitter's scale you're tracking 500 million active hashtags. At 50 bytes per entry that's 25 GB of state per process, and you'd need one per shard. Worse, the long tail of items that each appear only once or twice takes as much memory as anything else — you're paying full price for every distinct string that ever touched the stream.

Top-K hides two hard sub-problems that pull against each other. Frequency estimation needs to answer "how often has this item appeared?" for any item, cheaply, even when the total item space is enormous. Top-K selection needs to rank those estimates and produce a sorted list of K winners, continuously, without scanning all items on every query. Probabilistic data structures — specifically the Count-Min Sketch — break the O(distinct items) memory barrier by trading a provably bounded overcount error for a fixed-size counter array of a few kilobytes.

The engineering tension is accuracy versus memory: exact counting is correct but infeasible past a few million distinct items; approximate algorithms let you tune the error-memory trade-off explicitly and scale to billions. That trade-off, and the distributed machinery around it, is what this article covers.

Functional requirements

  • POST /event { item_id } — ingest a stream of item occurrences.
  • GET /top-k?k=100&window=5m — return the top-K most frequent items in the last N minutes.
  • Optional: breakdown by category; per-minute trending (delta-based ranking).
  • Optional: deduplicated counts (top-K by unique users who viewed, not raw events).

Non-functional requirements

  • Throughput: sustain peak ingest of 5–10 M events/sec across the cluster.
  • Memory: bounded per node regardless of the number of distinct items.
  • Accuracy: ≤ 1% relative error on frequency estimates for top-K items acceptable; missing a true top-K item entirely is not acceptable.
  • Latency: query the current top-K in < 10 ms.
  • Freshness: results reflect events from the last 30–60 seconds within a window.

Capacity estimation

DimensionEstimateHow we got there
Avg event rate~1.16 M events/sec100 × 10⁹ ÷ 86 400
Peak event rate~5 M events/sec~4× average, accounting for prime-time spikes
Distinct items~1 BHashtags ~500 M (Twitter scale); product IDs ~1 B (Amazon scale)
Exact hashmap cost50 GB per node1 B items × 50 bytes/item (8-byte key + 8-byte count + overhead) → not feasible in-memory per aggregator shard
CMS width (ε=0.1%)2 719 counters → rounded to 4 096w = ⌈e / ε⌉ = ⌈2.71828 / 0.001⌉ = 2 719
CMS depth (δ=0.1%)7 rowsd = ⌈ln(1/δ)⌉ = ⌈ln(1 000)⌉ = 7
CMS memory per sketch112 KB7 rows × 4 096 cells × 4 bytes/counter = 28 672 cells × 4 bytes → fits in L2 cache; thousands of shards feasible
Min-heap cost (K=1 000)~16 KB1 000 items × (8-byte ID + 8-byte count) → negligible
Event size~64 bytesitem_id (8 B) + timestamp (8 B) + metadata (~48 B)
Peak ingest bandwidth320 MB/sec across cluster5 M/sec × 64 bytes
Per-partition bandwidth~10 MB/sec320 MB/sec ÷ 32 Kafka partitions → comfortable for a Kafka broker

Takeaway: The sketch-vs-hashmap contrast (112 KB vs. 50 GB) is the whole point of this problem. State it explicitly in the interview.

Why exact counting fails

Before reaching for probabilistic structures, it's worth being precise about why a plain hashmap is not the answer at scale.

A server processing a partition of the event stream sees a stream of item_id values. If you keep Map<item_id, count> in memory, the map grows proportionally to the number of distinct items seen, not the total event count. Given a Zipfian distribution (realistic for hashtags, searches, products), there are a few very popular items and a long tail of items that each appear a handful of times. The tail dominates memory — at 1 B distinct items even with 50% of counts at 1, you still need 25+ GB on a single node.

You can shard the map horizontally by hash(item_id), but then finding the global top-K requires a multi-shard merge. That merge is correct only if every item's shard knows its complete count — which is true for exact counting but the sharding doesn't reduce peak memory per shard if your hash space is dense.

The root issue: exact cardinality tracking requires O(distinct items) space regardless of how you distribute it. Approximation algorithms trade a provably bounded error for O(1) or O(log N) space.

The two core algorithms

Count-Min Sketch — approximate frequency

A Count-Min Sketch is a 2-D array of counters with dimensions d × w:

  • d (depth): number of independent hash functions, one per row.
  • w (width): number of counter cells per row.

Update (item_id): for each row r in [0, d), compute h_r(item_id) mod w and increment counters[r][h_r(item_id) mod w].

Query (item_id): return min over r of counters[r][h_r(item_id) mod w].

           w counters per row
           ┌──────────────────────────────────────────┐
   row 000703020  │  ← h0(x) % w
   row 104006002  │  ← h1(x) % w
   row 230050040  │  ← h2(x) % w
           └──────────────────────────────────────────┘
              count(x) = min(7, 6, 5) = 5

The query takes the minimum across all rows because each row's counter for item x may have been bumped by hash collisions with other items. The minimum is the tightest upper bound available — it can't be lower than the true count (no row has lost any increments) but it won't be inflated by all the collisions at once.

Error guarantee. With probability at least 1 - δ:

true_count(x)  ≤  estimated_count(x)  ≤  true_count(x) + ε × N

where N is the total number of events inserted and:

  • w = ⌈e / ε⌉
  • d = ⌈ln(1 / δ)⌉

Three properties matter for the interview:

  1. Overcount only, never undercount. The minimum across rows gives the tightest upper bound. Hash collisions cause bumped counters; the minimum dampens this but cannot reduce a true count.
  2. Sublinear memory. The sketch size is fixed at d × w cells regardless of cardinality. Choosing ε=0.1%, δ=0.1% gives 7 rows × 4096 cells — 112 KB.
  3. Mergeable. Two sketches built from disjoint event sets can be merged by element-wise addition, provided both sketches were constructed with identical hash functions and seeds — different seeds produce different bucket mappings and make addition meaningless. This is what makes the distributed pattern work: all aggregator shards must share the same sketch configuration.

Relation to Bloom filters. A Bloom filter answers "have I seen this item at all?" using a similar hash-and-bit-array structure. A Count-Min Sketch extends the idea to "how many times have I seen this item?" — think of each counter row as an independent frequency register rather than a membership bit.

flowchart TD
    ITEM["item_id = 'WorldCup'"] --> H0["h0(item) % w → column 2"]
    ITEM --> H1["h1(item) % w → column 4"]
    ITEM --> H2["h2(item) % w → column 3"]
    H0 --> R0["row 0, col 2: increment"]
    H1 --> R1["row 1, col 4: increment"]
    H2 --> R2["row 2, col 3: increment"]
    R0 --> MIN["query: min(row0[2], row1[4], row2[3])"]
    R1 --> MIN
    R2 --> MIN
    MIN --> EST["estimated count"]
    style ITEM fill:#0e7490,color:#fff
    style MIN fill:#ff6b1a,color:#0a0a0f
    style EST fill:#15803d,color:#fff

Space-Saving / Misra-Gries — bounded counter heavy hitters

An alternative algorithm (Misra-Gries, 1982; the "Space-Saving" variant by Metwally et al., 2005) maintains exactly K counters at all times:

  • When an item arrives, if it already has a counter, increment it.
  • If there are fewer than K counters, add the item with count = 1.
  • Otherwise, find the item with the minimum counter value (say m), replace it with the new item, and set its count to m + 1.

Guarantee. Any item whose true count exceeds N / (K + 1) is guaranteed to appear in the result. Items not in the result set had a true frequency below that threshold.

Space-Saving is exact for top-K candidates (no false negatives above the threshold) but it can still overcount (the replaced item's counter is inflated). It uses O(K) space, which is larger than a Count-Min Sketch for large K but gives you a compact ranked list directly.

In practice, many production implementations combine both: a Count-Min Sketch to answer arbitrary frequency queries, plus a min-heap of size K updated with sketch estimates to track the current top-K candidates.

Combining sketch + heap

The practical top-K data structure:

struct TopKTracker {
    sketch:  CountMinSketch          // estimates freq(x) for any x
    heap:    MinHeap<(count, item)>  // current top-K candidates
    seen:    HashSet<item>           // items already in heap (fast membership)
}

function observe(item):
    sketch.increment(item)
    freq = sketch.query(item)
    if item in seen:
        heap.update(item, freq)
    elif heap.size < K:
        heap.push(item, freq)
        seen.add(item)
    elif freq > heap.min():
        evicted = heap.pop_min()
        seen.remove(evicted.item)
        heap.push(item, freq)
        seen.add(item)

The heap tracks which K items to expose; the sketch tracks how frequent every item actually is. The heap entries are refreshed with sketch estimates on every observation of an item already in the heap.

Cost: O(d) per event for the sketch update (constant for fixed d), O(log K) per event when the heap changes (rare for large N). Total: effectively O(1) per event in steady state.

Building up to the design

V1: Single node, exact hashmap

counts = {}
def observe(item):
    counts[item] = counts.get(item, 0) + 1

def top_k(k):
    return heapq.nlargest(k, counts.items(), key=lambda x: x[1])

This works and gives you the exact right answer. It's the right place to start in an interview: establish correctness before talking about what breaks.

What breaks is memory. At 1 B distinct items, counts grows to 50 GB. And one node can't sustain 5 M events/sec — you need to shard. That combination pushes you to V2.

V2: Sharded exact counting, then merge

Partition the event stream by hash(item_id) % N across N aggregator nodes. Each holds a subset of the item space. On query, each node returns its local top-K; the coordinator merges and picks the global top-K.

The subtle bug here trips up a lot of candidates. If you naively take the top-K from each shard and merge them, you will miss items that appear just below K on every shard but are globally top-K. Imagine item X appears 100 times on each of 10 shards — 1,000 total — but the local top-10 cutoff on each shard is 110. X never makes anyone's local list, so it never reaches the merge step. The fix is to send more than K items from each shard (a margin of K × fanout_factor) or route all events for a given item to the same shard so each shard's count is the complete global count.

Even with that fixed, the memory per shard is still O(distinct items on that shard), and the hottest shard gets all events for the single most popular item.

V3: Per-shard Count-Min Sketch + local top-K heap

Replace the exact hashmap on each aggregator with a TopKTracker (CMS + min-heap). Memory per shard drops from O(distinct items) to O(sketch_size + K) — a fixed number regardless of how many items stream through.

Now you can handle 5+ M events/sec across shards. The result is correct within the sketch's error bound. But there's still a gap: as time passes, counts from hours ago inflate the numbers for items that were popular then but aren't trending now.

V4: Time windowing

Two approaches:

Tumbling epochs. Every T seconds (e.g. T=60), reset the sketch and heap. Each window is an independent count. Fast, simple, but count resets abruptly — an item that spanned the boundary gets counted in neither window fully.

Sliding window with multiple epoch buckets. Keep the last W / T sketch buckets in a ring buffer. The current count for an item is the sum of its sketch estimates across all live buckets. When a bucket expires, drop it. Memory cost: W/T sketches instead of 1.

Exponential decay. Multiply all counters by a decay factor α < 1 every second (or per event batch). An item's effective count at time t is approximately ∑ α^(t - t_i) over its event times t_i — a geometric series that gives recent events more weight. This naturally smooths the decay without hard window resets but makes merging sketches slightly more complex (sketches must share the same decay timeline).

flowchart LR
    T0["Epoch t=0<br/>sketch S0"] --> T1["Epoch t=1<br/>sketch S1"]
    T1 --> T2["Epoch t=2<br/>sketch S2"]
    T2 --> LIVE["Sliding window<br/>S0 + S1 + S2<br/>(ring buffer)"]
    T2 -.expires.-> DROP["S0 dropped"]
    style LIVE fill:#15803d,color:#fff
    style DROP fill:#ff2e88,color:#fff

V5: Distributed merge + Lambda audit

The stream answer is approximate. For dashboards or billing that need exact counts, you keep a parallel batch path: raw events land in S3, and a nightly Spark or Flink job recomputes exact top-K for completed windows. That's the Lambda architecture applied here — speed layer (approximate, seconds of latency) plus batch layer (exact, hours of latency).

flowchart LR
    V1["V1: hashmap<br/>exact, single node"] --> V2["V2: sharded exact<br/>merge bug risk"]
    V2 --> V3["V3: CMS + heap/shard<br/>bounded memory"]
    V3 --> V4["V4: + time windows<br/>trending = recent"]
    V4 --> V5["V5: + merge service<br/>+ batch audit"]
    style V1 fill:#0e7490,color:#fff
    style V3 fill:#ff6b1a,color:#0a0a0f
    style V4 fill:#15803d,color:#fff
    style V5 fill:#a855f7,color:#fff

The rest of this article covers the production V5 design.

High-level architecture

flowchart TD
    CLI[Clients<br/>apps / services] --> GW[API Gateway]
    GW --> KAFKA[Kafka<br/>16+ partitions]
    KAFKA --> AGG1[Aggregator 0<br/>CMS + heap]
    KAFKA --> AGG2[Aggregator 1<br/>CMS + heap]
    KAFKA --> AGGN["Aggregator N<br/>CMS + heap"]
    AGG1 --> MERGE[Merge Service]
    AGG2 --> MERGE
    AGGN --> MERGE
    MERGE --> REDIS[(Redis<br/>current top-K)]
    MERGE --> TS[(Time-series store<br/>top-K snapshots)]
    REDIS --> QAPI[Query API]
    QAPI --> CLI2[Consumers]

    KAFKA --> S3[(S3 / data lake<br/>raw events)]
    S3 --> BATCH[Batch recount job<br/>Spark / Flink]
    BATCH --> AUDIT[(Audit store)]

    style KAFKA fill:#0e7490,color:#fff
    style AGG1 fill:#ff6b1a,color:#0a0a0f
    style AGG2 fill:#ff6b1a,color:#0a0a0f
    style AGGN fill:#ff6b1a,color:#0a0a0f
    style MERGE fill:#15803d,color:#fff
    style REDIS fill:#a855f7,color:#fff
    style BATCH fill:#ffaa00,color:#0a0a0f

Component deep-dives

Ingest layer (Kafka)

Events are partitioned by item_id — all events for a given item go to the same Kafka partition, so each aggregator shard owns a disjoint slice of the item space. This is what makes the merge clean: each shard's count for any item it owns is the complete global count for that item, not a partial view that needs to be summed across nodes.

partition = murmur3(item_id) % num_partitions

Partition count should be chosen larger than the number of aggregator nodes so rebalancing is possible without reshuffling too many items.

Throughput. 5 M events/sec × 64 bytes/event = 320 MB/sec. With 32 partitions: ~10 MB/sec per partition. Kafka brokers on commodity hardware typically sustain 50–200 MB/sec depending on replication factor, message size, and disk type; 10 MB/sec per partition is comfortably within those limits.

Aggregator service

Each aggregator process consumes one or more Kafka partitions. Internally:

for each event batch from Kafka:
    for each (item_id, timestamp) in batch:
        tracker.observe(item_id)
    if elapsed > flush_interval:
        local_topk = tracker.heap.top(K_margin)  # K_margin > K
        push local_topk to merge service
        if window_expired:
            tracker.reset_window()  # rotate epoch bucket

K_margin is typically 2×–10× the desired global K. Because the aggregator owns a disjoint item partition, its local top-K is already globally accurate for those items — but the merge service still needs to rank across all shards.

The Count-Min Sketch within the aggregator is used to serve arbitrary item-frequency queries (not just top-K), if the API requires them.

Merge service

sequenceDiagram
    participant A1 as Aggregator 0
    participant A2 as Aggregator 1
    participant AN as Aggregator N
    participant MRG as Merge Service
    participant RD as Redis

    A1->>MRG: local_topk [(item, est_count), ...]
    A2->>MRG: local_topk [(item, est_count), ...]
    AN->>MRG: local_topk [(item, est_count), ...]
    MRG->>MRG: merge lists (union; sum if same item seen in 2 shards during rebalance)
    MRG->>MRG: keep top-K by total est_count
    MRG->>RD: SET top_k = [ranked list] EX 60

Because items are partitioned by item_id, an item normally appears in exactly one aggregator's list. The merge is a ranked-list union, not a sum. The merge service runs every few seconds (configurable), writing the result atomically to Redis as a single serialized payload.

If the item-to-partition mapping is not perfectly disjoint (e.g., using consistent hashing with virtual nodes that can occasionally move), the merge service may see the same item from two shards during a rebalance — in that case it sums the counts and deduplicates.

Query API

GET /v1/top-k?k=50&window=5m
→ 200 OK
{
  "window": "5m",
  "computed_at": "2026-04-21T14:22:00Z",
  "items": [
    { "id": "hashtag:WorldCup",  "count": 1840221, "rank": 1 },
    { "id": "hashtag:GameOfYear", "count": 1201009, "rank": 2 },
    ...
  ]
}

The query API reads from Redis (single GET, < 1 ms). The response is pre-computed; no sketch is queried on the read path.

For arbitrary frequency queries (GET /v1/frequency?item=hashtag:WorldCup&window=5m), the query fan-outs to all aggregator shards that own that item's partition, reads the sketch estimate, and returns it — typically < 5 ms.

Lambda-style batch audit

The stream pipeline is approximate. For dashboards and billing that need exact counts, raw events land in S3/data lake. A nightly Spark or Flink batch job recomputes exact top-K from the full event log for completed windows. Results are written to the audit store and can be used to verify that the streaming top-K was within the expected error bounds, backfill corrected counts in a reporting database, or alert if the streaming result diverged by more than a threshold (a possible sign of a bug or adversarial inflation).

This is the Lambda architecture pattern applied to heavy hitters: a speed layer (streaming, approximate) and a batch layer (exact, delayed).

Time-windowing strategies

StrategyMemory per shardAccuracy at boundaryRecency biasComplexity
Single tumbling epoch1× sketchAbrupt reset; event at T-1s is lostHard cutLow
Sliding ring buffer (W/T epochs)W/T × sketchSmooth, accurateProportionalMedium
Exponential decay1× sketch + decay factorNo hard boundaryStrong recency (configurable)Medium
Exact sliding windowO(N events) bufferPerfectPerfectToo expensive at scale

Recommendation. Tumbling epochs (1-minute buckets) with a short ring buffer (5 buckets for a 5-minute window) gives a good accuracy-memory trade-off and is straightforward to implement and reason about. Exponential decay is preferable when you want smooth "trending" scores rather than hard-window counts.

Storage choices

DataStoreRationale
Raw eventsKafka (retention 24h), then S3Kafka for real-time consumption; S3 for batch
Per-shard sketch stateIn-process memorySketch is tiny (≤1 MB); survives aggregator restart by replaying Kafka from offset
Current top-KRedis (single key, JSON or msgpack)Sub-ms reads; write is one SET every few seconds
Historical top-K snapshotsTime-series DB (InfluxDB, TimescaleDB) or S3Trending over time; low write rate
Exact audit countsClickHouse or BigQueryColumnar; analytical queries; updated once per batch cycle

Failure modes and mitigations

Hot key / adversarial inflation

If one item is spammed (bot traffic, coordinated hashtag campaign), its Count-Min estimate explodes and it dominates the top-K. There are three layers of defense. Rate-limit per source IP or device at the API gateway before events enter Kafka. Add a deduplication window: count unique users who posted the item rather than raw post events — deduplicate on (user_id, item_id) within a 1-minute window using a Bloom filter per shard. Finally, run a velocity check: if an item's count delta in the last 10 seconds exceeds 10× its baseline, flag it for review. This doesn't block it immediately (which could suppress legitimate viral content) but queues it for a secondary verification pass.

Aggregator crash / restart

On restart, the aggregator replays Kafka from the last committed offset. Because Kafka retains events for 24 hours (configurable), the in-memory sketch is rebuilt from the event log within seconds to minutes depending on window size. For large windows (e.g., 1 hour), this replay can take time; a checkpoint of the sketch to durable storage every few minutes bounds the recovery cost.

Merge approximation error — the shard-margin subtlety

Even with item-partitioned routing, a global top-K query can miss items during a shard rebalance or if consistent hashing moves an item's partition. The K_margin parameter (sending 2×–10× the target K from each shard to the merge service) bounds this risk. The tradeoff: larger margin means more merge traffic; smaller margin raises the risk of missing a global top-K item that sits just below the cutoff on its shard.

Window boundary effects

At the boundary between epochs, an item that was very popular in the expiring epoch and is still popular now will appear to have a sharp count drop when the old bucket is discarded. The sliding ring buffer keeps partial data from the expiring epoch weighted by the fraction of the window it overlaps, which smooths this out. Alternatively, decay-based windows have no hard boundary by design.

Sketch error propagation in merge

When two sketches are merged by element-wise addition, the error bounds add linearly: the merged sketch has error ε₁N₁ + ε₂N₂ where N₁ and N₂ are event counts in each sketch. In the partitioned design, items are not double-counted across shards, so the merge is a union rather than an addition — error does not compound across shards for individual items. This is why item-partitioned routing is the preferred design.

Accuracy-memory trade-off (Count-Min Sketch parameters)

Target ε (relative error)Width w = ⌈e/ε⌉d=5 rows, 4B/cell (δ=1%)Notes
10% of N28560 bytesVery low memory; high error
1% of N2725.4 KBTypical for coarse trending
0.1% of N271954 KBGood default
0.01% of N27183544 KBTight error; still sub-MB

N here is the total event count in the window. If your window has 10 B events and ε=0.1%, the maximum overcount error on any item is 10 M — large in absolute terms, but small relative to the items at the very top of the ranking (which likely have counts in the hundreds of millions at Twitter/Google scale).

For top-K queries specifically, the error that matters is whether the relative ordering of items near rank K is correct. In practice, items just below rank K typically have substantially lower counts than those at rank K (Zipfian distribution), so the error rarely causes misordering of the true top-K.

Things to discuss in an interview

  • Why can't you use a distributed exact hashmap? Not memory — you can add machines — but the merge cost: merging N partial hashmaps on every query is O(distinct items), which is the same problem you started with.
  • Why Count-Min over Space-Saving? CMS is mergeable (sketch addition), which makes the distributed pattern clean. Space-Saving is exact for top-K candidates but harder to merge across shards. In practice, many systems use CMS for distributed frequency estimation and Space-Saving locally on each shard for compact ranked-list output.
  • The shard margin problem. Always mention that the local top-K must include a margin beyond K before merging.
  • Trending vs. total popularity. Total popularity = all-time count (large counters, never reset). Trending = relative change in frequency over a short window. Trending usually requires windowed or decayed counts and a delta score (count_now / count_yesterday), not a raw count ranking.
  • How to handle the item space changing. New items appear constantly. Count-Min Sketch handles them transparently — no schema change needed. The heap naturally promotes newly popular items as their sketch estimates rise above the current minimum.
  • Idempotency. If Kafka delivers an event twice (at-least-once delivery), the sketch and heap both overcount. For dashboards this is acceptable; for billing it is not. Options: exactly-once Kafka consumers (Kafka transactions), or deduplicate on event_id using a short-TTL set before writing to the sketch.

Things you should now be able to answer

  • What is the error guarantee of a Count-Min Sketch, and in which direction does it err?
  • How do you tune the sketch width and depth for a target error probability?
  • Why does partitioning the event stream by item_id matter for merge correctness?
  • What is the shard-margin problem, and how does it arise even with correct partitioning?
  • How do tumbling epochs differ from a sliding ring buffer for trending?
  • Why does combining CMS with a min-heap give you both frequency estimation and top-K tracking?
  • When would you choose Space-Saving over Count-Min Sketch?
  • How does exponential decay produce a "trending" score rather than a raw count?

Further reading

  • Cormode & Muthukrishnan, "An Improved Data Stream Summary: The Count-Min Sketch and Its Applications" (2005) — the canonical paper.
  • Metwally, Agrawal & El Abbadi, "Efficient Computation of Frequent and Top-k Elements in Data Streams" (ICDT 2005) — Space-Saving algorithm.
  • The Bloom filter article — sibling probabilistic structure; useful for the deduplication pattern above.
  • Kafka documentation on partition assignment and consumer groups — the ingest-layer backbone.
  • Flink's TopKFunction operator — a production reference implementation of windowed heavy hitters in a stream processor.
// FAQ

Frequently asked questions

What is a Count-Min Sketch and how does it bound its error?

A Count-Min Sketch is a 2-D array of counters with d rows and w cells per row, using one independent hash function per row. On a query it returns the minimum counter value across all rows, which gives a tight upper bound: the estimate can overcount by at most epsilon times N (total events in the window), but it can never undercount. Width w and depth d are set by the target error rate via w = ceil(e / epsilon) and d = ceil(ln(1 / delta)).

Why does a plain hashmap fail for top-K at web scale, and how much memory does a Count-Min Sketch save?

A hashmap grows proportionally to distinct items: at 1 billion distinct items with 50 bytes per entry, that is 50 GB per aggregator node, which is unusable. A Count-Min Sketch with 7 rows and 4096 cells uses 112 KB per shard regardless of how many distinct items stream through it, fitting comfortably in L2 cache.

When should you use Space-Saving instead of Count-Min Sketch?

Space-Saving (Misra-Gries) guarantees that any item whose true count exceeds N divided by K+1 appears in the result, making it exact for top-K candidate detection with O(K) space. Count-Min Sketch is preferred in distributed designs because sketches built with identical hash seeds are mergeable by element-wise addition, while Space-Saving is harder to merge across shards. In practice many systems use Count-Min for distributed frequency estimation and Space-Saving locally on each shard for compact ranked-list output.

What is the shard-margin problem in distributed top-K, and how do you fix it?

If each aggregator sends only its local top-K to the merge service, a globally popular item that falls just below the local cutoff on its shard never reaches the merge step and is silently dropped from the global result. The fix is to send K_margin items from each shard, typically 2 to 10 times the target global K, so that borderline candidates are included in the merge.

How do tumbling epochs differ from a sliding ring buffer for trending, and which does the article recommend?

A tumbling epoch resets the sketch every T seconds, giving a hard count boundary where an event at the very end of a window contributes fully while the same event a second later counts in neither. A sliding ring buffer keeps W/T sketch buckets and drops the oldest on each rotation, smoothing boundary effects at the cost of W/T times the memory. The article recommends 1-minute tumbling buckets with a 5-bucket ring buffer for a 5-minute window as a good accuracy-memory trade-off, or exponential decay when a smooth trending score is preferred over hard-window counts.

// RELATED

You may also like