Reflection Real Systems ยท Apache Cassandra
/**
* cassandra.ts โ Apache Cassandra: Token Ring, Quorums & Gossip
*
* Run with: npx ts-node cassandra.ts
*
* Demonstrates Cassandra's approach to distribution:
* - Token ring with virtual nodes (vnodes)
* - Tunable consistency (QUORUM, ONE, ALL)
* - Gossip-based membership
* - Read repair and hinted handoff
*/
import * as crypto from 'crypto'
function hashToken(key: string): bigint {
// Cassandra uses Murmur3 partitioner (64-bit hash)
const hash = crypto.createHash('md5').update(key).digest('hex')
return BigInt('0x' + hash.substring(0, 16))
}
// --- Types ---
type ConsistencyLevel = 'ONE' | 'QUORUM' | 'ALL'
type NodeState = 'UP' | 'DOWN'
interface WriteResult { ok: boolean, writtenTo: number, required: number }
interface ReadResult { value?: string, timestamp: number, ok: boolean }
let globalClock = 0n
function tick(): bigint { return ++globalClock }
// --- Cassandra Node ---
class CassandraNode {
id: string
tokens: bigint[] // virtual nodes (multiple tokens per physical node)
data: Map<string, { value: string, timestamp: bigint }> = new Map()
hintedHandoff: Map<string, { value: string, timestamp: bigint, targetId: string }> = new Map()
state: NodeState = 'UP'
gossipVersion: number = 0
constructor(id: string, numTokens: number = 4) {
this.id = id
// Generate random tokens for vnodes
this.tokens = Array.from({ length: numTokens }, () =>
BigInt('0x' + crypto.randomBytes(8).toString('hex'))
)
}
}
// --- Cassandra Cluster ---
class CassandraCluster {
nodes: CassandraNode[] = []
addNode(node: CassandraNode) {
this.nodes.push(node)
}
// Find the node responsible for a key (nearest token clockwise)
findResponsible(key: string): CassandraNode {
const keyHash = hashToken(key)
let closest = this.nodes[0]
let minDist = -1n
for (const node of this.nodes) {
if (node.state !== 'UP') continue
// Find the closest token clockwise from keyHash
for (const token of node.tokens) {
// Ring distance: (token - keyHash) mod MAX
const dist = token >= keyHash
? token - keyHash
: BigInt('0xFFFFFFFFFFFFFFFF') - keyHash + token
if (minDist === -1n || dist < minDist) {
minDist = dist
closest = node
}
}
}
return closest
}
// Get replica nodes for a key (next N nodes on the ring)
getReplicas(key: string, count: number): CassandraNode[] {
const replicas: CassandraNode[] = []
const keyHash = hashToken(key)
const allNodes = this.nodes.filter(n => n.state === 'UP')
// Sort nodes by proximity to keyHash on the ring
const sorted = [...allNodes].sort((a, b) => {
const distA = Math.min(...a.tokens.map(t => Number(
t >= keyHash ? t - keyHash : BigInt('0xFFFFFFFFFFFFFFFF') - keyHash + t
)))
const distB = Math.min(...b.tokens.map(t => Number(
t >= keyHash ? t - keyHash : BigInt('0xFFFFFFFFFFFFFFFF') - keyHash + t
)))
return distA - distB
})
return sorted.slice(0, count)
}
// --- Write with Tunable Consistency ---
write(key: string, value: string, cl: ConsistencyLevel): WriteResult {
const replicas = this.getReplicas(key, 3) // RF = 3
const required = cl === 'QUORUM' ? Math.floor(replicas.length / 2) + 1
: cl === 'ALL' ? replicas.length
: 1
const ts = tick()
let writtenTo = 0
for (const replica of replicas) {
if (replica.state === 'UP') {
replica.data.set(key, { value, timestamp: ts })
writtenTo++
} else {
// Hinted handoff: store for later delivery
replica.hintedHandoff.set(key, {
value, timestamp: ts, targetId: replica.id,
})
}
}
const ok = writtenTo >= required
return { ok, writtenTo, required }
}
// --- Read with Read Repair ---
read(key: string, cl: ConsistencyLevel): ReadResult {
const replicas = this.getReplicas(key, 3)
const required = cl === 'QUORUM' ? Math.floor(replicas.length / 2) + 1
: cl === 'ALL' ? replicas.length
: 1
const results: { value: string, timestamp: bigint, node: CassandraNode }[] = []
for (const replica of replicas) {
const entry = replica.data.get(key)
if (entry) {
results.push({ ...entry, node: replica })
}
}
const ok = results.length >= required
if (!ok) return { ok: false, timestamp: 0 }
// Find the latest version
const latest = results.reduce((a, b) =>
a.timestamp > b.timestamp ? a : b
)
// Read repair: if any replica has stale data, fix it
for (const result of results) {
if (result.timestamp < latest.timestamp) {
console.log(` ๐ง Read repair: ${result.node.id} had stale data, updating`)
result.node.data.set(key, { value: latest.value, timestamp: latest.timestamp })
}
}
return { ok: true, value: latest.value, timestamp: Number(latest.timestamp) }
}
// --- Node Failure ---
failNode(nodeId: string) {
const node = this.nodes.find(n => n.id === nodeId)
if (node) {
node.state = 'DOWN'
console.log(`\n๐ฅ Node ${nodeId} marked DOWN`)
}
}
recoverNode(nodeId: string) {
const node = this.nodes.find(n => n.id === nodeId)
if (node) {
node.state = 'UP'
console.log(`\n๐ Node ${nodeId} recovered`)
// Deliver hinted handoff
for (const [key, entry] of node.hintedHandoff) {
if (entry.targetId === nodeId) {
node.data.set(key, { value: entry.value, timestamp: entry.timestamp })
console.log(` ๐จ Hinted handoff delivered: ${key} โ ${entry.value}`)
}
}
node.hintedHandoff.clear()
}
}
printState() {
console.log('\n๐ Cluster State:')
for (const node of this.nodes) {
const tokens = node.tokens.map(t => t.toString().slice(0, 8) + '...').join(', ')
console.log(` ${node.id} ${node.state === 'UP' ? 'โ
' : 'โ'} | vnodes: ${node.tokens.length} | keys: ${node.data.size}`)
}
}
}
// --- Simulation ---
function simulateCassandra() {
console.log('=== APACHE CASSANDRA โ TOKEN RING, QUORUMS & READ REPAIR ===\n')
const cluster = new CassandraCluster()
// Create 3 nodes (RF = 3)
const n1 = new CassandraNode('N1', 4)
const n2 = new CassandraNode('N2', 4)
const n3 = new CassandraNode('N3', 4)
cluster.addNode(n1)
cluster.addNode(n2)
cluster.addNode(n3)
cluster.printState()
// --- Phase 1: Write at different consistency levels ---
console.log('\n=== PHASE 1: Writing with tunable consistency ===')
console.log('\n-- Write at CL=ONE (fastest, weakest) --')
let result = cluster.write('user:100', 'alice', 'ONE')
console.log(` SET user:100 โ ${result.writtenTo}/${result.required} replicas acknowledged`)
console.log('\n-- Write at CL=QUORUM --')
result = cluster.write('order:500', 'shipped', 'QUORUM')
console.log(` SET order:500 โ ${result.writtenTo}/${result.required} replicas acknowledged`)
console.log('\n-- Write at CL=ALL (slowest, strongest) --')
result = cluster.write('config:db', 'v3', 'ALL')
console.log(` SET config:db โ ${result.writtenTo}/${result.required} replicas acknowledged`)
// --- Phase 2: Read with read repair ---
console.log('\n=== PHASE 2: Read with read repair ===')
// Simulate a stale replica by directly writing an older value to N2
n2.data.set('user:100', { value: 'old_alice', timestamp: 1n })
console.log('\n-- Read user:100 at CL=QUORUM (triggers read repair) --')
const readResult = cluster.read('user:100', 'QUORUM')
console.log(` GET user:100 โ ${readResult.value} (read repaired)`)
console.log(` N1 now has: ${n1.data.get('user:100')?.value}`)
console.log(` N2 now has: ${n2.data.get('user:100')?.value} (was stale, now fixed)`)
// --- Phase 3: Failure and hinted handoff ---
console.log('\n=== PHASE 3: Failure and hinted handoff ===')
cluster.failNode('N3')
console.log('\n-- Writing during N3 failure --')
result = cluster.write('product:42', 'gizmo', 'QUORUM')
console.log(` SET product:42 โ ${result.writtenTo}/${result.required} (hinted handoff queued for N3)`)
cluster.recoverNode('N3')
console.log('\nโ
Cassandra simulation complete.')
}
simulateCassandra()
โฌ Download cassandra.ts