Approximating Distinct Element in a Stream

This post explains a probabilistic counting algorithm with which one can estimate the number of distinct elements in a large collection of data in a single pass.

This post is based on notes by Alex Tsun and paper by Flajolet and Martin (1985). It is pretty amazing the the algorithm discovered in the 1980s when the memory was very expensive has a huge number of applications in networking, database, and computer system now - big data era (Flajolet & Martin, 1985).

Motivation problem

For one of the most popular videos on Youtube, let’s say there are N=2N = 2 billion views, with n=900n = 900 million of them being distinct views. How much space is required to accurately track this number? Well, let’s assume a user ID is an 8-byte integer. Then, we need 900,000,000×8900,000,000 \times 8 bytes total if we use a Set to track the user IDs, which requires 7.27.2 gigabytes of memory for ONE video.

Intuitions and examples

When we count distinct values nn, we notice that it has the following property:

For instance, we cannot have 2-2 distinct values (distinct users). Furthermore, the distinct values always increase or remain constant, such as we have 77 distinct users at the current moment and this value will either increase or remain constant during the streaming process.

Instead of having a set of distinct values and querying the new value is unique or not, we could rely on hashing and probability to count the number of distinct values with O(1)O(1) space.

distinct values
Figure 1. Illustration of using hash function to count the number of distinct values.

The idea of approximating distinct elements in a stream is to use the minimum value generated randomly from the hash function as it is illustrated in the Figure 1.

Probabilistic reasoning

If U1,,UmUnif(0,1)U_1, \cdots, U_m \sim \mathrm{Unif}(0, 1) (continuous) are iid, and X=min{U1,,Um}X = \mathrm{min}\{U_1, \cdots, U_m \} is their minimum where mm is the number of distinct values, then

E[X]=1m+1(1) \mathbb{E}[X] = \frac{1}{m+1} \tag{1}

To prove the above statement, we start working with probabilities first (e.g., the CDF FX(x)=P(Xx)F_X(x) = \mathbb{P}(X \leq x)) and take the derivative to find the PDF (this is a common strategy for dealing with continuous random variables).

P(X>x)=P(min{U1,,Um}>x)=P(U1>x,U2>x,,Um>x)=i=1mP(Ui>x)=i=1m(1x)=(1x)m(2) \begin{aligned} \mathbb{P}(X > x) & = \mathbb{P}(\mathrm{min}\{U_1, \cdots, U_m\} > x) \\ & = \mathbb{P}(U_1 > x, U_2 >x, \cdots, U_m >x )\\ & = \prod_{i = 1}^m \mathbb{P}(U_i > x) \\ & = \prod_{i=1}^m (1-x) \\ & = (1-x)^m \tag{2} \end{aligned}

For the second equation, we use the fact that the minimum of numbers is greater than a value if and only if all of them are.

Now, we have that

FX(x)=P(Xx)=1P(X>x)=1(1x)m(3) \mathrm{F}_X(x) = \mathbb{P}(X \leq x) = 1 - \mathbb{P}(X > x) = 1 - (1-x)^m \tag{3}

Now, take the derivative of Equation (3), we can have

fX(x)=m(1x)m1(4) f_X(x) = m(1-x)^{m-1} \tag{4}

Now, we can calculate the expectation of random variable XX

E[X]=01xfX(x)dx=01xm(1x)m1dx=m01x(1x)m1dx=m10(u1)um1du(u=1x;du=dx)=m10(umum1)du=m[1m+1um+11mum+C]10=m[1m1m+1]=1m+1(5) \begin{aligned} \mathbb{E}[X] & = \int_0^1 x f_X(x) dx \\ & = \int_0^1 x m(1-x)^{m-1} dx \\ & = m \int_0^1 x (1-x)^{m-1} dx \\ & = m \int_1^0 (u-1) u^{m-1} du \quad (u = 1-x; du = -dx) \\ & = m \int_1^0 (u^m - u^{m-1}) du \\ & = m \left [ \frac{1}{m+1}u^{m+1} - \frac{1}{m} u^m + C \right]_1^0 \\ & = m \left [ \frac{1}{m} - \frac{1}{m+1} \right] \\ & = \frac{1}{m+1} \tag{5} \end{aligned}

The Equation (5) shows that we could calculate the value of mm as follows:

m:=1E[X]1 m := \frac{1}{\mathbb{E}[X]} - 1

For instance, we have E[X]=0.32\mathbb{E}[X] = 0.32, the number of distinct values can be calculated as

m:=10.3213.1251=2.125 m := \frac{1}{0.32} -1 \approx 3.125 -1 = 2.125

Now, we will derive the variance for random variable XX with the following formula,

Var[X]=E[X2]E[X]2 \mathrm{Var}[X] = \mathbb{E}[X^2] - \mathbb{E}[X]^2

First, we set random variable Y=X2Y = X^2 and derive its CDF and PDF.

FY(y)=P(Yy)=P(X2y)=P(yXy)=P(0Xy)=1(1y)m(using equation (3)) \begin{aligned} F_Y(y) = \mathbb{P}(Y \leq y) & = \mathbb{P}(X^2 \leq y) \\ & = \mathbb{P}(-\sqrt{y} \leq X \leq \sqrt{y}) \\ & = \mathbb{P}(0 \leq X \leq \sqrt{y}) \\ & = 1 - (1-\sqrt{y})^m \quad (\text{using equation (3)}) \end{aligned}

Take the derivative, we can have the CDF,

fY(y)=m2(1y1/2)m1y1/2(6) f_Y(y) = \frac{m}{2} (1- y^{1/2})^{m-1} y^{-1/2} \tag{6}

Therefore,

E[Y]=01yfY(y)dy=01ym2(1y1/2)m1y1/2dy=m201y1/2(1y1/2)m1dy=m01x2(1x)m1dx(y=x2)=m10(1u)2um1du(u=1x)=2(m+1)(m+2)(7) \begin{aligned} \mathbb{E}[Y] & = \int_0^1 y f_Y(y) dy \\ & = \int_0^1 y \frac{m}{2} (1- y^{1/2})^{m-1} y^{-1/2} dy \\ & = \frac{m}{2} \int_0^1 y^{1/2} (1- y^{1/2})^{m-1} dy \\ & = m \int_0^1 x^2 (1-x)^{m-1} dx \quad (y = x^2) \\ & = -m \int_1^0 (1-u)^2 u^{m-1} du \quad (u = 1-x) \\ & = \frac{2}{(m+1)(m+2)} \tag{7} \end{aligned}

Then we can have

Var[X]=2(m+1)(m+2)1(m+1)2=m(m+1)2(m+2)(8) \mathrm{Var}[X] = \frac{2}{(m+1)(m+2)} - \frac{1}{(m+1)^2} = \frac{m}{(m+1)^2(m+2)} \tag{8}

Therefore, we can state

Var[X]<1(m+1)2(9) \mathrm{Var}[X] < \frac{1}{(m+1)^2} \tag{9}

With the Chebyshev’s inequality, we can show

Pr(XE[X]ϵE[X]])Var[X](ϵE[X])2<1ϵ2 \mathrm{Pr}(|X - \mathbb{E}[X]| \geq \epsilon \mathbb{E}[X]]) \leq \frac{\mathrm{Var}[X]}{(\epsilon \mathbb{E}[X])^2} < \frac{1}{\epsilon^2}

This means the bound is vacuous for any ϵ<1\epsilon < 1. Therefore, this method is not very accurate. How could improve its accuracy? We can leverage the law of large numbers.

If X1,,XnX_1, \cdots, X_n are iid RVs with mean μ\mu and variance σ2\sigma^2, we’ll show that the sample mean has the same variance but lower variance based on linearity of expectation and variance.

E[Xˉn]=E[1ni=1nXi]=1ni=1nE[Xi]=1nnμ=μVar[Xˉn]=Var[1ni=1nXi]=1n2i=1nVar(Xi)=1n2nσ2=σ2n(10) \begin{aligned} \mathbb{E}[\bar{X}_n] & = \mathbb{E}\left[ \frac{1}{n} \sum_{i=1}^n X_i \right] = \frac{1}{n} \sum_{i=1}^n \mathbb{E}[X_i] = \frac{1}{n} n \mu = \mu \\ \mathrm{Var}[\bar{X}_n] & = \mathrm{Var}\left [ \frac{1}{n} \sum_{i=1}^n X_i \right] = \frac{1}{n^2} \sum_{i=1}^n \mathrm{Var}(X_i) = \frac{1}{n^2} n \sigma^2 = \frac{\sigma^2}{n} \tag{10} \end{aligned}

With the equation (10), we can use kk hash functions instead of one as follows.

Stream \rarr 13 25 19 25 19 19 vali_i
h1h_1 0.51 0.26 0.79 0.26 0.79 0.79 0.26
h2h_2 0.22 0.83 0.53 0.84 0.53 0.53 0.22
\dots \dots \dots \dots \dots \dots \dots \dots \dots
hkh_k 0.27 0.44 0.72 0.44 0.72 0.72 0.27

Now, for improved accuracy, we just take the average of the kk minimums first, before reverse-solving.

With the Chebyshev’s inequality, we can show

Pr(XE[Xˉ]ϵE[Xˉ]])Var[Xˉ](ϵE[Xˉ])2<1kϵ2 \mathrm{Pr}(|X - \mathbb{E}[\bar{X}]| \geq \epsilon \mathbb{E}[\bar{X}]]) \leq \frac{\mathrm{Var}[\bar{X}]}{(\epsilon \mathbb{E}[\bar{X}])^2} < \frac{1}{k \epsilon^2}

If we want the error with the probability at most δ\delta, we can set the value of kk as

k=1δϵ2(11) k = \frac{1}{\delta \epsilon^2} \tag{11}

Median trick

In equation (11), the space complexity for the hash function grows 20 times if we set our failure rate δ=0.05\delta = 0.05,

k=10.05×ϵ2=20ϵ2 k = \frac{1}{0.05 \times \epsilon^2} = \frac{20}{\epsilon^2}

If we want to reduce our space complexity, we could use the “median-of-means” technique for error and success amplification, and analyze it using the Chernoff bound (Thomas Vidick, CMS139 notes).

Now, we set t=O(log1/δ)t = O(\log1/\delta) and run tt trials each with failure probability δ=1/5\delta = 1/5, which gives us the value of k=5/ϵ2k = 5/\epsilon^2 (the number of hash functions we need).

Now, let d^1,d^2,,d^t\hat{d}_1, \hat{d}_2, \cdots, \hat{d}_t be the outcomes of the tt trails, which is the estimated value of the number of distinct elements. Then, we set

d^=median(d^1,d^2,,d^t) \hat{d} = \mathrm{median}(\hat{d}_1, \hat{d}_2, \cdots, \hat{d}_t)

We know that for the median μ~\tilde{\mu} of a random variable XX are all points xx such that

Prob(Xμ~)=Prob(Xμ~)=12(12) \mathrm{Prob}(X \leq \tilde{\mu}) = \mathrm{Prob}(X \geq \tilde{\mu}) = \frac{1}{2} \tag{12}

Assume the true value is dd, and we bound our error with the following range

[(14ϵ)d,(1+4ϵ)d] [(1-4\epsilon)d, (1+4\epsilon)d]

Then, if more than half of trials fill in the above range, then the equation (12) tells us that the median for sure will fall in the above range. Now, let’s assume that 2/32/3 of trials fall in the range, then the median will fall in too.

Since the failure probability δ=1/5\delta = 1/5 , then we know for all tt trials, each of them can fall in the the error bound with probability at least 4/54/5 (that’s the meaning and purpose of failure probability). Now, we want to estimate the probability that the median d^\hat{d} falling in the error bound we set up.

Let random variable XX be the number of trails of falling in [(14ϵ)d,(1+4ϵ)d][(1-4\epsilon)d, (1+4\epsilon)d], then we can have

Pr(d^[(14ϵ)d,(1+4ϵ)d])Pr(X<23t)(13) \mathrm{Pr}\left ( \hat{d} \notin [(1-4\epsilon)d, (1+4\epsilon)d] \notin \right) \leq \mathrm{Pr}\left ( X < \frac{2}{3} t \right) \tag{13}

The equations (13) means that the probability of median d^\hat{d} does not fall in the range is bounded by Pr(x<23t){Pr}\left ( x < \frac{2}{3} t \right) for the obvious reason (definition of median).

Since we set up δ=1/5\delta = 1/5, which means that each trail falls in the range with probability at leas 4/54/5, therefore we could have

E[X]=E[i=0tIi(falling in the range)]=i=0tE[Ii(falling in the range)]=i=0t1Pr[Ii(falling in the range)]=45t(14) \begin{aligned} \mathbb{E}[X] & = \mathbb{E} \left [ \sum_{i=0}^t I_i (\text{falling in the range}) \right] \\ & = \sum_{i=0}^t \mathbb{E} [ I_i (\text{falling in the range}) ]\\ & = \sum_{i=0}^t 1 \cdot \mathrm{Pr}[I_i (\text{falling in the range}) ] \\ & = \frac{4}{5} t \tag{14} \end{aligned}

Now, substitute the equation (14) into the equation (13), we can have

Pr(d^[(14ϵ)d,(1+4ϵ)d])Pr(X<23t)=Pr(X<56E[X])=Pr(XE[X]<16E[X])=Pr(E[X]X>16E[X])Pr(XE[X]16E[X]) \begin{aligned} \mathrm{Pr}\left ( \hat{d} \notin [(1-4\epsilon)d, (1+4\epsilon)d] \notin \right) & \leq \mathrm{Pr}\left ( X < \frac{2}{3} t \right) \\ & = \mathrm{Pr}\left ( X < \frac{5}{6} \cdot \mathbb{E}[X] \right) \\ & = \mathrm{Pr}\left ( X - \mathbb{E}[X] < - \frac{1}{6} \cdot \mathbb{E}[X] \right) \\ & = \mathrm{Pr}\left ( \mathbb{E}[X] - X > \frac{1}{6} \cdot \mathbb{E}[X] \right) \\ & \leq \mathrm{Pr}\left ( | X - \mathbb{E}[X] | \geq \frac{1}{6} \cdot \mathbb{E}[X] \right) \end{aligned}

Now, apply the following Chernoff bound (there are variations of this bound, see wikipedia)

Pr(Xμϵμ)2exp(ϵ2μ3), \mathrm{Pr}(|X - \mu| \geq \epsilon \mu ) \leq 2 \exp \left ( \frac{-\epsilon^2 \mu}{3} \right),

we could have

Pr(XE[X]16E[X])2exp((1/6)2(4/5)t3)=O(ect)(15) \mathrm{Pr}\left ( | X - \mathbb{E}[X] | \geq \frac{1}{6} \cdot \mathbb{E}[X] \right) \leq 2 \exp \left ( \frac{-(1/6)^2 \cdot (4/5)t}{3} \right) = O(e^{-ct}) \tag{15}

Since we set t=O(log1/δ)t = O(\log1/\delta), equation (15) shows the error is bounded by δ\delta for sure.

In summary, for t=O(log1/δ)t = O(\log1/\delta) trials, each using k=1/(ϵ2δ)k = 1/(\epsilon^2 \delta) hash functions. With δ=1/5\delta = 1/5, the total space (hash function we need to store) we will use is

k=5tϵ2=O(log(1/δ)ϵ2) k = \frac{5t}{\epsilon^2} = O \left( \frac{\log(1/\delta)}{\epsilon^2} \right)

This means the space complexity does not depend on the number of distinct elements dd or the number of items in the stream nn (both of these numbers are typically very large).

Algorithms in practice

Right now, our algorithm uses continuous valued fully random hash functions. Howwever, this can’t be implemented. Therefore, we need to extend the analysis into the discrete field.

In practice, it is pretty amazing that we can actually reduce the space complexity down to loglogn\log \log n bits (maybe not that amazing as we are only trying to get zero-moment from one data pass). This algorithm is often referred to as “approximate counting”, “probabilistic counting2, or simply the “LogLog algorithm” (with HyperLogLog being the most recent version (Flajolet et al., 2007) ).

Before we implementing the Flajolet-Martin algorithm, let’s frame the counting as a random process. When we talk about ‘counting’, the first thing comes to our mind is probably a sequence of numbers: 1,2,3,N1, 2, 3, \cdots N. However, any sequence could be taken as a counting process naturally:

Now, let’s do some magic stuff: every time a new element comes in, we flip the coin, if it was a head, then we mark it as 11 and put it into a vector of length 32; if it was a tail, then we mark it as 00 and put it the vector.

distinct values
Figure 2. Illustration of hash function as a machine of flipping coins.

In the paper by Flajolet and Martin (1985), they noticed that if the values hash function returns are uniformly distributed, the pattern of having kk consecutive tails (0) appears with probability 2k12^{-k-1}, which is aligned with our common sense. Assume we flip a coin three times, what’s the probability of having 3 heads? It is 1/8=231/8 = 2^{-3}.

Now, let’s run a simulation with Python. We want to run different times of simulation and calculate the probability of having 3 consecutive trailing zeros when we hash different amount of elements into the bit string of length 32.

distinct values
Figure 3. plot of simulations that calculates the probability of having 3 consecutive trailing zeros when we hash different amount of distinct elements.

Firgure 3 shows that the probability of having 33 consecutive trailing zeros becomes stable, which is around 0.061530.06153 when we have 1×1051 \times 10^5 distinct values hashed as a bitstring of length 32, which is close to

124=0.0625 \frac{1}{2^4} = 0.0625

It is this pattern that gives Flajolet and Martin’s idea of probabilistic counting algorithm, which means that the probability of a hash output ending with 2k2^k (a one followed by kk zeros, such as ...1000...1000) is 2(k+1)2^-(k+1), since this corresponds to flipping kk heads and then a tail with a fair coin.

Now, we will use this idea to count the distinct values. We assume that we have a hash function hh that maps each element to a bitstring of length ss (32), i.e.

h:[n]{0,1}s h: [n] \to \{0, 1\}^s

We assume that hh is truly random, which means that for each i[n]i \in [n], h(i)h(i) is chosen uniformly at random from {0,1}s\{0, 1\}^s.

Next, for each i[n]i \in [n], let r(i)r(i) be the number of trailing 0’s in h(i)h(i). For example,

h(42)=10111100010110001010010000110110,(1 zero) h(42) = 10111100010110001010010000110110, \quad \text{(1 zero)}

so in this case, r(42)=1r(42) = 1. A useful observation about this quantity r(i)r(i) is that:

Pr[r(i)k]=12k(16) \mathrm{Pr} [ r(i) \geq k] = \frac{1}{2^k} \tag{16}

Remark: equation (16) gives the cumulative probability whereas the Figure 3 only shows the probability of k=3k= 3. Here is the algorithm.

def fm_algorithm(num = 1000):
    """
    plot the probabilistic distinct elements counting algorithms
    source: https://en.wikipedia.org/wiki/Flajolet%E2%80%93Martin_algorithm
    """
    bitmap = '0'*32
    z = 0
    for i in range(num):
        hash_int = mmh3.hash(str(i), signed=False)
        hash_str = "{0:b}".format(hash_int)
        count_trailing0s = _calculate_trailing0s(hash_str)

        bitmap = bitmap[:count_trailing0s] + '1' + bitmap[count_trailing0s+1:]

    r = bitmap.find('0')
    
    return 2**r

As we have analyzed that using one hash function does not give very accurate estimation. We need to deploy more hash functions and use median trick to improve the accuracy.

distinct values
Figure 4. plot of counting distinct elements with Flajolet-Martin algorithm.
  1. Flajolet, P., Fusy, É., Gandouet, O., & Meunier, F. (2007). Hyperloglog: the analysis of a near-optimal cardinality estimation algorithm. Discrete Mathematics and Theoretical Computer Science, 137–156.
  2. Flajolet, P., & Martin, G. N. (1985). Probabilistic counting algorithms for data base applications. Journal of Computer and System Sciences, 31(2), 182–209.