Distributed & Decentralized Systems Curriculum
Reflection Real Systems ยท Apache Cassandra
/**
 * cassandra.ts โ€” Apache Cassandra: Token Ring, Quorums & Gossip
 *
 * Run with: npx ts-node cassandra.ts
 *
 * Demonstrates Cassandra's approach to distribution:
 * - Token ring with virtual nodes (vnodes)
 * - Tunable consistency (QUORUM, ONE, ALL)
 * - Gossip-based membership
 * - Read repair and hinted handoff
 */

import * as crypto from 'crypto'

function hashToken(key: string): bigint {
  // Cassandra uses Murmur3 partitioner (64-bit hash)
  const hash = crypto.createHash('md5').update(key).digest('hex')
  return BigInt('0x' + hash.substring(0, 16))
}

// --- Types ---

type ConsistencyLevel = 'ONE' | 'QUORUM' | 'ALL'
type NodeState = 'UP' | 'DOWN'

interface WriteResult { ok: boolean, writtenTo: number, required: number }
interface ReadResult { value?: string, timestamp: number, ok: boolean }

let globalClock = 0n
function tick(): bigint { return ++globalClock }

// --- Cassandra Node ---

class CassandraNode {
  id: string
  tokens: bigint[]            // virtual nodes (multiple tokens per physical node)
  data: Map<string, { value: string, timestamp: bigint }> = new Map()
  hintedHandoff: Map<string, { value: string, timestamp: bigint, targetId: string }> = new Map()
  state: NodeState = 'UP'
  gossipVersion: number = 0

  constructor(id: string, numTokens: number = 4) {
    this.id = id
    // Generate random tokens for vnodes
    this.tokens = Array.from({ length: numTokens }, () =>
      BigInt('0x' + crypto.randomBytes(8).toString('hex'))
    )
  }
}

// --- Cassandra Cluster ---

class CassandraCluster {
  nodes: CassandraNode[] = []

  addNode(node: CassandraNode) {
    this.nodes.push(node)
  }

  // Find the node responsible for a key (nearest token clockwise)
  findResponsible(key: string): CassandraNode {
    const keyHash = hashToken(key)
    let closest = this.nodes[0]
    let minDist = -1n

    for (const node of this.nodes) {
      if (node.state !== 'UP') continue
      // Find the closest token clockwise from keyHash
      for (const token of node.tokens) {
        // Ring distance: (token - keyHash) mod MAX
        const dist = token >= keyHash
          ? token - keyHash
          : BigInt('0xFFFFFFFFFFFFFFFF') - keyHash + token
        if (minDist === -1n || dist < minDist) {
          minDist = dist
          closest = node
        }
      }
    }
    return closest
  }

  // Get replica nodes for a key (next N nodes on the ring)
  getReplicas(key: string, count: number): CassandraNode[] {
    const replicas: CassandraNode[] = []
    const keyHash = hashToken(key)
    const allNodes = this.nodes.filter(n => n.state === 'UP')

    // Sort nodes by proximity to keyHash on the ring
    const sorted = [...allNodes].sort((a, b) => {
      const distA = Math.min(...a.tokens.map(t => Number(
        t >= keyHash ? t - keyHash : BigInt('0xFFFFFFFFFFFFFFFF') - keyHash + t
      )))
      const distB = Math.min(...b.tokens.map(t => Number(
        t >= keyHash ? t - keyHash : BigInt('0xFFFFFFFFFFFFFFFF') - keyHash + t
      )))
      return distA - distB
    })

    return sorted.slice(0, count)
  }

  // --- Write with Tunable Consistency ---
  write(key: string, value: string, cl: ConsistencyLevel): WriteResult {
    const replicas = this.getReplicas(key, 3) // RF = 3
    const required = cl === 'QUORUM' ? Math.floor(replicas.length / 2) + 1
      : cl === 'ALL' ? replicas.length
      : 1

    const ts = tick()
    let writtenTo = 0

    for (const replica of replicas) {
      if (replica.state === 'UP') {
        replica.data.set(key, { value, timestamp: ts })
        writtenTo++
      } else {
        // Hinted handoff: store for later delivery
        replica.hintedHandoff.set(key, {
          value, timestamp: ts, targetId: replica.id,
        })
      }
    }

    const ok = writtenTo >= required
    return { ok, writtenTo, required }
  }

  // --- Read with Read Repair ---
  read(key: string, cl: ConsistencyLevel): ReadResult {
    const replicas = this.getReplicas(key, 3)
    const required = cl === 'QUORUM' ? Math.floor(replicas.length / 2) + 1
      : cl === 'ALL' ? replicas.length
      : 1

    const results: { value: string, timestamp: bigint, node: CassandraNode }[] = []

    for (const replica of replicas) {
      const entry = replica.data.get(key)
      if (entry) {
        results.push({ ...entry, node: replica })
      }
    }

    const ok = results.length >= required
    if (!ok) return { ok: false, timestamp: 0 }

    // Find the latest version
    const latest = results.reduce((a, b) =>
      a.timestamp > b.timestamp ? a : b
    )

    // Read repair: if any replica has stale data, fix it
    for (const result of results) {
      if (result.timestamp < latest.timestamp) {
        console.log(`   ๐Ÿ”ง Read repair: ${result.node.id} had stale data, updating`)
        result.node.data.set(key, { value: latest.value, timestamp: latest.timestamp })
      }
    }

    return { ok: true, value: latest.value, timestamp: Number(latest.timestamp) }
  }

  // --- Node Failure ---
  failNode(nodeId: string) {
    const node = this.nodes.find(n => n.id === nodeId)
    if (node) {
      node.state = 'DOWN'
      console.log(`\n๐Ÿ’ฅ Node ${nodeId} marked DOWN`)
    }
  }

  recoverNode(nodeId: string) {
    const node = this.nodes.find(n => n.id === nodeId)
    if (node) {
      node.state = 'UP'
      console.log(`\n๐Ÿ”‹ Node ${nodeId} recovered`)

      // Deliver hinted handoff
      for (const [key, entry] of node.hintedHandoff) {
        if (entry.targetId === nodeId) {
          node.data.set(key, { value: entry.value, timestamp: entry.timestamp })
          console.log(`   ๐Ÿ“จ Hinted handoff delivered: ${key} โ†’ ${entry.value}`)
        }
      }
      node.hintedHandoff.clear()
    }
  }

  printState() {
    console.log('\n๐Ÿ“Š Cluster State:')
    for (const node of this.nodes) {
      const tokens = node.tokens.map(t => t.toString().slice(0, 8) + '...').join(', ')
      console.log(`   ${node.id} ${node.state === 'UP' ? 'โœ…' : 'โŒ'} | vnodes: ${node.tokens.length} | keys: ${node.data.size}`)
    }
  }
}

// --- Simulation ---

function simulateCassandra() {
  console.log('=== APACHE CASSANDRA โ€” TOKEN RING, QUORUMS & READ REPAIR ===\n')

  const cluster = new CassandraCluster()

  // Create 3 nodes (RF = 3)
  const n1 = new CassandraNode('N1', 4)
  const n2 = new CassandraNode('N2', 4)
  const n3 = new CassandraNode('N3', 4)
  cluster.addNode(n1)
  cluster.addNode(n2)
  cluster.addNode(n3)

  cluster.printState()

  // --- Phase 1: Write at different consistency levels ---
  console.log('\n=== PHASE 1: Writing with tunable consistency ===')

  console.log('\n-- Write at CL=ONE (fastest, weakest) --')
  let result = cluster.write('user:100', 'alice', 'ONE')
  console.log(`   SET user:100 โ†’ ${result.writtenTo}/${result.required} replicas acknowledged`)

  console.log('\n-- Write at CL=QUORUM --')
  result = cluster.write('order:500', 'shipped', 'QUORUM')
  console.log(`   SET order:500 โ†’ ${result.writtenTo}/${result.required} replicas acknowledged`)

  console.log('\n-- Write at CL=ALL (slowest, strongest) --')
  result = cluster.write('config:db', 'v3', 'ALL')
  console.log(`   SET config:db โ†’ ${result.writtenTo}/${result.required} replicas acknowledged`)

  // --- Phase 2: Read with read repair ---
  console.log('\n=== PHASE 2: Read with read repair ===')

  // Simulate a stale replica by directly writing an older value to N2
  n2.data.set('user:100', { value: 'old_alice', timestamp: 1n })

  console.log('\n-- Read user:100 at CL=QUORUM (triggers read repair) --')
  const readResult = cluster.read('user:100', 'QUORUM')
  console.log(`   GET user:100 โ†’ ${readResult.value} (read repaired)`)
  console.log(`   N1 now has: ${n1.data.get('user:100')?.value}`)
  console.log(`   N2 now has: ${n2.data.get('user:100')?.value} (was stale, now fixed)`)

  // --- Phase 3: Failure and hinted handoff ---
  console.log('\n=== PHASE 3: Failure and hinted handoff ===')
  cluster.failNode('N3')

  console.log('\n-- Writing during N3 failure --')
  result = cluster.write('product:42', 'gizmo', 'QUORUM')
  console.log(`   SET product:42 โ†’ ${result.writtenTo}/${result.required} (hinted handoff queued for N3)`)

  cluster.recoverNode('N3')

  console.log('\nโœ… Cassandra simulation complete.')
}

simulateCassandra()

โฌ‡ Download cassandra.ts