Distributed & Decentralized Systems Curriculum
Reflection Real Systems ยท Riak KV
/**
 * riak.ts โ€” Riak: Vector Clocks, Consistent Hashing & Read Repair
 *
 * Run with: npx ts-node riak.ts
 *
 * Demonstrates Riak's approach to distribution:
 * - Consistent hashing with virtual nodes (vnodes)
 * - Vector clocks for write conflict detection
 * - Read repair / hinted handoff
 * - Last-write-wins (LWW) vs sibling-aware resolution
 */

// --- Types ---

class VectorClock {
  counters: Map<string, number> = new Map()

  constructor(initial: [string, number][] = []) {
    for (const [node, count] of initial) {
      this.counters.set(node, count)
    }
  }

  increment(nodeId: string): void {
    this.counters.set(nodeId, (this.counters.get(nodeId) || 0) + 1)
  }

  // Compare: returns -1 (less), 0 (concurrent), 1 (greater), or 2 (equal)
  compare(other: VectorClock): number {
    let thisHasGreater = false
    let otherHasGreater = false

    const allNodes = new Set([...this.counters.keys(), ...other.counters.keys()])

    for (const node of allNodes) {
      const a = this.counters.get(node) || 0
      const b = other.counters.get(node) || 0
      if (a > b) thisHasGreater = true
      if (b > a) otherHasGreater = true
    }

    if (!thisHasGreater && !otherHasGreater) return 2  // equal
    if (thisHasGreater && !otherHasGreater) return 1   // this is descendant
    if (!thisHasGreater && otherHasGreater) return -1  // other is descendant
    return 0  // concurrent (conflict)
  }
}

interface RiakObject {
  key: string
  value: string
  vectorClock: VectorClock
  lastWriteTime: number
}

// --- Virtual Node (vnode) ---

class VNode {
  id: string
  physicalNode: string  // physical node the vnode belongs to
  data: Map<string, RiakObject> = new Map()

  constructor(id: string, physicalNode: string) {
    this.id = id
    this.physicalNode = physicalNode
  }
}

// --- Token Ring (Consistent Hashing) ---

class RiakRing {
  vnodes: VNode[] = []
  // Riak uses a 160-bit hash ring with 2^64 partitions by default
  // For our simulation, we use a smaller ring with hash % size

  constructor(vnodes: VNode[]) {
    this.vnodes = vnodes
  }

  // Consistent hashing: find the vnode responsible for a key
  findVNode(key: string): VNode {
    const hash = this.hashString(key)
    const index = hash % BigInt(this.vnodes.length)
    return this.vnodes[Number(index)]
  }

  // PrefList: Riak replicates to the N successor vnodes (N = replication factor)
  findPrefList(key: string, n: number = 3): VNode[] {
    const hash = this.hashString(key)
    const startIndex = Number(hash % BigInt(this.vnodes.length))
    const result: VNode[] = []

    for (let i = 0; i < n; i++) {
      const idx = (startIndex + i) % this.vnodes.length
      result.push(this.vnodes[idx])
    }

    return result
  }

  private hashString(str: string): bigint {
    let hash = 0n
    for (let i = 0; i < str.length; i++) {
      const char = BigInt(str.charCodeAt(i))
      hash = ((hash << 5n) - hash) + char
      hash = hash & 0xFFFFFFFFFFFFFFFFn
    }
    return hash
  }
}

// --- Riak Client ---

class RiakClient {
  ring: RiakRing
  replicationFactor: number = 3
  physicalNodes: Map<string, boolean> = new Map()  // nodeId โ†’ alive
  stats: { reads: number, writes: number, conflicts: number, repairs: number } =
    { reads: 0, writes: 0, conflicts: 0, repairs: 0 }

  constructor(ring: RiakRing, replicationFactor: number = 3) {
    this.ring = ring
    this.replicationFactor = replicationFactor

    // Track physical nodes
    for (const vnode of ring.vnodes) {
      this.physicalNodes.set(vnode.physicalNode, true)
    }
  }

  // --- Write with Vector Clocks ---

  put(key: string, value: string, clientNodeId: string): void {
    this.stats.writes++
    const prefList = this.ring.findPrefList(key, this.replicationFactor)

    // Collect current vector clocks from all replicas
    let latestClock = new VectorClock()
    let latestTimestamp = 0

    for (const vnode of prefList) {
      if (!this.physicalNodes.get(vnode.physicalNode)) continue

      const existing = vnode.data.get(key)
      if (existing) {
        if (existing.lastWriteTime > latestTimestamp) {
          latestTimestamp = existing.lastWriteTime
        }
        const cmp = existing.vectorClock.compare(latestClock)
        if (cmp === 0 || cmp === -1) {
          // Existing clock is concurrent or ancestor, merge
          const merged = new VectorClock()
          for (const [node, count] of existing.vectorClock.counters) {
            const latestCount = latestClock.counters.get(node) || 0
            merged.counters.set(node, Math.max(count, latestCount))
          }
          for (const [node, count] of latestClock.counters) {
            if (!merged.counters.has(node)) {
              merged.counters.set(node, count)
            }
          }
          latestClock = merged
        }
      }
    }

    // Increment the coordinator's counter in the vector clock
    latestClock.increment(clientNodeId)

    const obj: RiakObject = {
      key,
      value,
      vectorClock: latestClock,
      lastWriteTime: Date.now(),
    }

    // Write to all available replicas (hinted handoff for unavailable ones)
    for (const vnode of prefList) {
      if (!this.physicalNodes.get(vnode.physicalNode)) {
        // Hinted handoff: store hint on the next available vnode
        console.log(`   โš ๏ธ  Hinted handoff for ${key} โ€” ${vnode.physicalNode} is down`)
        const nextAlive = this.ring.vnodes.find(
          v => this.physicalNodes.get(v.physicalNode) && v !== vnode
        )
        if (nextAlive) {
          nextAlive.data.set(key, obj)
        }
        continue
      }
      vnode.data.set(key, obj)
    }

    console.log(`   โœ๏ธ  PUT ${key} = "${value}" (client ${clientNodeId}) | clock: ${this.formatClock(latestClock)}`)
  }

  // --- Read with Sibling Resolution ---

  get(key: string): { value: string, isConflict: boolean, siblings?: [string, VectorClock][] } {
    this.stats.reads++
    const prefList = this.ring.findPrefList(key, this.replicationFactor)

    const results: RiakObject[] = []

    for (const vnode of prefList) {
      if (!this.physicalNodes.get(vnode.physicalNode)) continue

      const obj = vnode.data.get(key)
      if (obj) {
        results.push(obj)
      }
    }

    if (results.length === 0) {
      return { value: '<not found>', isConflict: false }
    }

    // Check for conflicts using vector clock comparison
    const byClock = new Map<string, RiakObject[]>()
    for (const obj of results) {
      const key = this.formatClock(obj.vectorClock)
      if (!byClock.has(key)) byClock.set(key, [])
      byClock.get(key)!.push(obj)
    }

    if (byClock.size > 1) {
      // Siblings found! Return all for client resolution
      this.stats.conflicts++
      const siblings: [string, VectorClock][] = results.map(r => [r.value, r.vectorClock])
      console.log(`   ๐Ÿ”€ GET ${key} โ€” CONFLICT (${siblings.length} siblings)`)
      return {
        value: `[CONFLICT: ${results.map(r => r.value).join(' vs ')}]`,
        isConflict: true,
        siblings,
      }
    }

    // No conflict: return the latest (by vector clock or timestamp)
    results.sort((a, b) => a.lastWriteTime - b.lastWriteTime)
    const latest = results[results.length - 1]
    console.log(`   ๐Ÿ“– GET ${key} = "${latest.value}" | clock: ${this.formatClock(latest.vectorClock)}`)

    // Read repair: if any replica has stale data, update it
    for (const obj of results) {
      if (obj.lastWriteTime < latest.lastWriteTime) {
        this.stats.repairs++
        const staleVNode = prefList.find(v =>
          this.physicalNodes.get(v.physicalNode) && v.data.get(key) === obj
        )
        if (staleVNode) {
          staleVNode.data.set(key, latest)
          console.log(`   ๐Ÿ”ง Read repair on ${staleVNode.physicalNode} โ€” updated to "${latest.value}"`)
        }
      }
    }

    return { value: latest.value, isConflict: false }
  }

  // --- LWW Mode (last-write-wins, no siblings) ---

  getLWW(key: string): string {
    this.stats.reads++
    const prefList = this.ring.findPrefList(key, this.replicationFactor)

    let latestValue = '<not found>'
    let latestTimestamp = 0

    for (const vnode of prefList) {
      if (!this.physicalNodes.get(vnode.physicalNode)) continue
      const obj = vnode.data.get(key)
      if (obj && obj.lastWriteTime > latestTimestamp) {
        latestTimestamp = obj.lastWriteTime
        latestValue = obj.value
      }
    }

    return latestValue
  }

  // --- Failure Simulation ---

  failNode(nodeId: string) {
    this.physicalNodes.set(nodeId, false)
    console.log(`\n๐Ÿ’ฅ ${nodeId} FAILED`)
  }

  recoverNode(nodeId: string) {
    this.physicalNodes.set(nodeId, true)
    console.log(`\n๐Ÿ”„ ${nodeId} RECOVERED โ€” anti-entropy will repair data`)
  }

  private formatClock(vc: VectorClock): string {
    return `[${[...vc.counters.entries()].map(([k, v]) => `${k}:${v}`).join(', ')}]`
  }

  printStats() {
    console.log('\n๐Ÿ“Š Riak Cluster Stats:')
    console.log(`   Writes: ${this.stats.writes}`)
    console.log(`   Reads:  ${this.stats.reads}`)
    console.log(`   Conflicts: ${this.stats.conflicts}`)
    console.log(`   Repairs:   ${this.stats.repairs}`)
  }
}

// --- Simulation ---

function simulateRiak() {
  console.log('=== RIAK KV โ€” VECTOR CLOCKS & CONSISTENT HASHING ===\n')

  // Create vnodes spread across 3 physical nodes
  const nodes = ['N1', 'N2', 'N3', 'N4']
  const vnodes: VNode[] = []
  for (let i = 0; i < 12; i++) {
    const physicalNode = nodes[i % nodes.length]
    vnodes.push(new VNode(`vnode${i}`, physicalNode))
  }

  console.log(`Created ${vnodes.length} vnodes across ${nodes.length} physical nodes`)

  const ring = new RiakRing(vnodes)
  const client = new RiakClient(ring, 3)

  // --- Phase 1: Basic writes and reads ---
  console.log('\n=== PHASE 1: Basic operations ===')
  client.put('user:42', 'Alice', 'N1')
  client.put('user:99', 'Bob', 'N2')

  const r1 = client.get('user:42')
  console.log(`   Result: user:42 = "${r1.value}"`)

  // --- Phase 2: Vector clock conflicts (concurrent writes) ---
  console.log('\n=== PHASE 2: Concurrent writes โ†’ vector clock conflict ===')

  // Write from N1 (clock: N1:1)
  client.put('cart:100', 'item:A', 'N1')
  // Write from N2 without seeing N1's update (clock: N2:1)
  // The two clocks are concurrent โ†’ conflict!
  client.put('cart:100', 'item:B', 'N2')

  const r2 = client.get('cart:100')
  console.log(`   Result: cart:100 = "${r2.value}"`)

  // Client resolves the conflict
  if (r2.isConflict && r2.siblings) {
    console.log(`   ๐Ÿ† Client chose "item:A" as the winner`)
    client.put('cart:100', 'item:A', 'N3')
  }

  const r3 = client.get('cart:100')
  console.log(`   After resolution: cart:100 = "${r3.value}"`)

  // --- Phase 3: Node failure and hinted handoff ---
  console.log('\n=== PHASE 3: Node failure with hinted handoff ===')
  client.failNode('N2')
  client.put('order:500', 'processing', 'N1')
  client.recoverNode('N2')

  const r4 = client.get('order:500')
  console.log(`   After recovery: order:500 = "${r4.value}"`)

  // --- Phase 4: Read repair ---
  console.log('\n=== PHASE 4: Read repair ===')
  // Write to all nodes
  client.put('settings:theme', 'dark', 'N1')
  // Simulate a stale replica (via direct manipulation of one vnode)
  const strayVNode = ring.findPrefList('settings:theme', 3)[1]
  strayVNode.data.set('settings:theme', {
    key: 'settings:theme',
    value: 'light',
    vectorClock: new VectorClock([['N0', 0]]),
    lastWriteTime: 0,
  })

  // Read triggers read repair
  const r5 = client.get('settings:theme')
  console.log(`   After read repair: settings:theme = "${r5.value}"`)
  console.log(`   Stale vnode is now: ${strayVNode.data.get('settings:theme')?.value}`)

  // --- Summary ---
  console.log('\n=== SUMMARY ===')
  client.printStats()
  console.log('\nโœ… Riak simulation complete.')
}

simulateRiak()

โฌ‡ Download riak.ts