LSH: Locality Sensitive Hashing

10 min

Explore how LSH uses probabilistic hash functions to find similar vectors in sub-linear time, perfect for streaming and high-dimensional data.

Best viewed on desktop for optimal interactive experience

LSH: Locality Sensitive Hashing

Locality Sensitive Hashing (LSH) is a probabilistic technique that uses special hash functions to map similar vectors to the same hash buckets with high probability, enabling sub-linear time similarity search.

Interactive LSH Visualization

Explore how LSH uses random projections and hash functions to partition space for efficient similarity search:

LSH (Locality Sensitive Hashing)

Probabilistic algorithm for approximate nearest neighbor search

LSH Concept Visualization

Instructions: Click on the canvas to place a query point (red), then click "Query" to find similar points using LSH. Green points are candidates returned by the hash tables.

The Core Idea

Unlike traditional hash functions that avoid collisions, LSH deliberately causes collisions for similar items:

P[h(x) = h(y)] = sim(x, y)

The probability of hashing to the same bucket is proportional to the similarity between items.

LSH Families

1. Random Projection (Cosine Similarity)

For cosine similarity, use random hyperplanes:

def random_projection_hash(vector, hyperplanes): """Hash using random hyperplanes for cosine similarity""" hash_code = 0 for i, hyperplane in enumerate(hyperplanes): # Check which side of hyperplane if np.dot(vector, hyperplane) >= 0: hash_code |= (1 << i) return hash_code

Collision probability:

P[h(x) = h(y)] = 1 - θ(x,y)π

Where θ(x,y) is the angle between vectors.

2. MinHash (Jaccard Similarity)

For set similarity, use min-wise independent permutations:

def minhash(set_items, num_hashes): """MinHash for Jaccard similarity""" signature = [] for i in range(num_hashes): # Use different hash functions min_hash = float('inf') for item in set_items: hash_val = hash_function(item, seed=i) min_hash = min(min_hash, hash_val) signature.append(min_hash) return signature

Collision probability equals Jaccard similarity:

P[h(A) = h(B)] = |A \cap B||A \cup B| = J(A, B)

3. p-Stable Distributions (Euclidean Distance)

For L2 distance, use Gaussian random projections:

def p_stable_hash(vector, a, b, w): """Hash using p-stable distributions for Lp distance""" # a: random vector from Gaussian distribution # b: random offset [0, w] # w: bucket width projection = np.dot(vector, a) + b return int(projection / w)

Amplification Techniques

AND Amplification (Increase Precision)

Combine r hash functions - match only if ALL agree:

PAND = Pr
  • Reduces false positives
  • Increases false negatives
  • Makes matching stricter

OR Amplification (Increase Recall)

Use b independent hash tables - match if ANY agrees:

POR = 1 - (1 - P)b
  • Increases true positives
  • Increases false positives
  • Makes matching more lenient

AND-OR Amplification

Combine both: b tables of r functions each:

Pfinal = 1 - (1 - Pr)b

This creates an S-curve that sharpens the similarity threshold.

Implementation

Basic LSH Index

class LSHIndex: def __init__(self, d, num_tables=5, hash_size=10): self.num_tables = num_tables self.hash_size = hash_size self.tables = [{} for _ in range(num_tables)] # Generate random hyperplanes for each table self.hyperplanes = [] for _ in range(num_tables): planes = np.random.randn(hash_size, d) planes /= np.linalg.norm(planes, axis=1, keepdims=True) self.hyperplanes.append(planes) def _hash(self, vector, table_id): """Generate hash for vector in specific table""" planes = self.hyperplanes[table_id] projections = np.dot(planes, vector) return tuple(projections >= 0) def insert(self, vector, item_id): """Insert vector into all hash tables""" for table_id in range(self.num_tables): hash_key = self._hash(vector, table_id) if hash_key not in self.tables[table_id]: self.tables[table_id][hash_key] = [] self.tables[table_id][hash_key].append(item_id) def query(self, vector, max_results=10): """Find similar vectors""" candidates = set() for table_id in range(self.num_tables): hash_key = self._hash(vector, table_id) if hash_key in self.tables[table_id]: candidates.update(self.tables[table_id][hash_key]) return list(candidates)[:max_results]

Multi-Probe LSH

Improve recall by checking nearby buckets:

def multi_probe_query(self, vector, num_probes=3): """Query with multi-probe to increase recall""" candidates = set() for table_id in range(self.num_tables): base_hash = self._hash(vector, table_id) # Generate nearby hashes by flipping bits for probe in generate_probes(base_hash, num_probes): if probe in self.tables[table_id]: candidates.update(self.tables[table_id][probe]) return candidates def generate_probes(hash_code, num_probes): """Generate nearby hash codes by flipping bits""" probes = [hash_code] hash_list = list(hash_code) # Single bit flips for i in range(min(num_probes, len(hash_list))): probe = hash_list.copy() probe[i] = not probe[i] probes.append(tuple(probe)) return probes

Performance Analysis

Time Complexity

OperationExact SearchLSH
BuildO(N)O(N · L · k)
QueryO(N · d)O(L · k + \text{candidates})
SpaceO(N · d)O(N · L)

Where:

  • L = number of hash tables
  • k = hash signature length
  • d = vector dimension

Probability Guarantees

For (r, c)-sensitive family:

P[collision] = \begin{cases} ≥ p1 & \text{if } d(x,y) ≤ r \ ≤ p2 & \text{if } d(x,y) ≥ cr \end{cases}

With p1 > p2 and c > 1.

Parameter Selection

Optimal parameters for threshold similarity s:

r = log Nlog(1/p2), b = N\rho

Where \rho = log p1log p2.

Advanced Techniques

1. Cross-Polytope LSH

Better theoretical guarantees than random projection:

class CrossPolytopeLSH: def __init__(self, d): # Create orthogonal basis self.basis = self._create_orthogonal_basis(d) def hash(self, vector): # Rotate vector rotated = self.rotate(vector) # Find closest basis vector max_coord = np.argmax(np.abs(rotated)) sign = np.sign(rotated[max_coord]) return (max_coord, sign)

2. Data-Dependent LSH

Learn hash functions from data:

class LearnedLSH: def __init__(self, training_data): # Learn optimal projections using PCA self.projections = PCA(n_components=32).fit(training_data) def hash(self, vector): # Project onto learned subspace projected = self.projections.transform(vector) # Threshold to get binary code return projected > np.median(projected)

3. Distributed LSH

Scale to billions of vectors:

class DistributedLSH: def __init__(self, num_nodes): self.nodes = num_nodes self.tables = [RemoteHashTable(i) for i in range(num_nodes)] async def insert(self, vector, item_id): # Hash to determine node node_id = hash(item_id) % self.nodes # Insert into remote table await self.tables[node_id].insert(vector, item_id) async def query(self, vector): # Query all nodes in parallel tasks = [table.query(vector) for table in self.tables] results = await asyncio.gather(*tasks) # Merge results return merge_results(results)

Comparison with Other Methods

LSH vs HNSW vs IVF-PQ

FeatureLSHHNSWIVF-PQ
Build SpeedVery FastSlowFast
Query SpeedFastVery FastFast
MemoryLowHighVery Low
RecallModerateExcellentGood
UpdatesInstantIncrementalBatch
TheoryStrongLimitedLimited

When to Use LSH

Use LSH when:

  • Streaming data (instant updates)
  • Need theoretical guarantees
  • Very high dimensions (>1000)
  • Simple implementation required
  • Memory is limited

Avoid LSH when:

  • Need very high recall (>95%)
  • Small dataset (<10K vectors)
  • Can afford complex preprocessing
  • Latency is absolutely critical

Real-World Applications

1. Near-Duplicate Detection

Finding similar documents at web scale:

def detect_near_duplicates(documents, threshold=0.8): lsh = MinHashLSH(threshold=threshold, num_perm=128) duplicates = [] for doc_id, content in enumerate(documents): # Create shingles shingles = create_shingles(content, k=3) # Compute MinHash signature signature = MinHash(num_perm=128) for shingle in shingles: signature.update(shingle.encode()) # Check for duplicates similar = lsh.query(signature) if similar: duplicates.append((doc_id, similar)) else: lsh.insert(doc_id, signature) return duplicates

2. Recommendation Systems

Real-time similar item recommendations:

class LSHRecommender: def __init__(self, num_recommendations=10): self.lsh = LSHIndex(d=100, num_tables=10) self.num_recs = num_recommendations def fit(self, item_embeddings): for item_id, embedding in enumerate(item_embeddings): self.lsh.insert(embedding, item_id) def recommend(self, user_embedding): # Find similar items candidates = self.lsh.query(user_embedding, max_results=100) # Rank by exact similarity scores = [] for item_id in candidates: similarity = cosine_similarity( user_embedding, self.item_embeddings[item_id] ) scores.append((item_id, similarity)) # Return top recommendations scores.sort(key=lambda x: x[1], reverse=True) return scores[:self.num_recs]

3. Genomic Sequence Matching

Finding similar DNA sequences:

def sequence_lsh(sequence, k=10): """LSH for genomic sequences""" # Extract k-mers kmers = [sequence[i:i+k] for i in range(len(sequence)-k+1)] # Hash each k-mer signature = [] for seed in range(20): # 20 hash functions min_hash = min(hash((kmer, seed)) for kmer in kmers) signature.append(min_hash) return signature

Implementation Tips

1. Hash Function Design

def create_hash_family(similarity_type): if similarity_type == 'cosine': # Random hyperplanes return RandomProjectionHash(num_projections=10) elif similarity_type == 'jaccard': # MinHash return MinHashFamily(num_permutations=128) elif similarity_type == 'euclidean': # p-stable distributions return PStableHash(bucket_width=2.0)

2. Optimal Table Configuration

def estimate_parameters(n, target_recall=0.9): """Estimate LSH parameters for target recall""" # Approximate using empirical formula L = int(np.log(1 - target_recall) / np.log(0.5)) k = int(np.log(n) / np.log(2)) return { 'num_tables': L, 'hash_size': k, 'expected_recall': 1 - (0.5 ** L) }

3. Dynamic Rehashing

class AdaptiveLSH: def __init__(self): self.tables = [] self.threshold = 100 # Items per bucket def insert(self, vector, item_id): # Insert into existing tables for table in self.tables: table.insert(vector, item_id) # Check if rehashing needed if self.max_bucket_size() > self.threshold: self.rehash() def rehash(self): """Add new table with different hash functions""" new_table = create_hash_table() # Redistribute heavy buckets for table in self.tables: for bucket in table.heavy_buckets(): for item in bucket: new_table.insert(item) self.tables.append(new_table)

References

  • Indyk & Motwani. "Approximate Nearest Neighbors: Towards Removing the Curse of Dimensionality"
  • Andoni & Indyk. "Near-Optimal Hashing Algorithms for Approximate Nearest Neighbor in High Dimensions"
  • LSH Forest Paper
  • Datasketch: MinHash LSH Library

If you found this explanation helpful, consider sharing it with others.

Mastodon