Reflection Real Systems ยท Riak KV
/**
* riak.ts โ Riak: Vector Clocks, Consistent Hashing & Read Repair
*
* Run with: npx ts-node riak.ts
*
* Demonstrates Riak's approach to distribution:
* - Consistent hashing with virtual nodes (vnodes)
* - Vector clocks for write conflict detection
* - Read repair / hinted handoff
* - Last-write-wins (LWW) vs sibling-aware resolution
*/
// --- Types ---
class VectorClock {
counters: Map<string, number> = new Map()
constructor(initial: [string, number][] = []) {
for (const [node, count] of initial) {
this.counters.set(node, count)
}
}
increment(nodeId: string): void {
this.counters.set(nodeId, (this.counters.get(nodeId) || 0) + 1)
}
// Compare: returns -1 (less), 0 (concurrent), 1 (greater), or 2 (equal)
compare(other: VectorClock): number {
let thisHasGreater = false
let otherHasGreater = false
const allNodes = new Set([...this.counters.keys(), ...other.counters.keys()])
for (const node of allNodes) {
const a = this.counters.get(node) || 0
const b = other.counters.get(node) || 0
if (a > b) thisHasGreater = true
if (b > a) otherHasGreater = true
}
if (!thisHasGreater && !otherHasGreater) return 2 // equal
if (thisHasGreater && !otherHasGreater) return 1 // this is descendant
if (!thisHasGreater && otherHasGreater) return -1 // other is descendant
return 0 // concurrent (conflict)
}
}
interface RiakObject {
key: string
value: string
vectorClock: VectorClock
lastWriteTime: number
}
// --- Virtual Node (vnode) ---
class VNode {
id: string
physicalNode: string // physical node the vnode belongs to
data: Map<string, RiakObject> = new Map()
constructor(id: string, physicalNode: string) {
this.id = id
this.physicalNode = physicalNode
}
}
// --- Token Ring (Consistent Hashing) ---
class RiakRing {
vnodes: VNode[] = []
// Riak uses a 160-bit hash ring with 2^64 partitions by default
// For our simulation, we use a smaller ring with hash % size
constructor(vnodes: VNode[]) {
this.vnodes = vnodes
}
// Consistent hashing: find the vnode responsible for a key
findVNode(key: string): VNode {
const hash = this.hashString(key)
const index = hash % BigInt(this.vnodes.length)
return this.vnodes[Number(index)]
}
// PrefList: Riak replicates to the N successor vnodes (N = replication factor)
findPrefList(key: string, n: number = 3): VNode[] {
const hash = this.hashString(key)
const startIndex = Number(hash % BigInt(this.vnodes.length))
const result: VNode[] = []
for (let i = 0; i < n; i++) {
const idx = (startIndex + i) % this.vnodes.length
result.push(this.vnodes[idx])
}
return result
}
private hashString(str: string): bigint {
let hash = 0n
for (let i = 0; i < str.length; i++) {
const char = BigInt(str.charCodeAt(i))
hash = ((hash << 5n) - hash) + char
hash = hash & 0xFFFFFFFFFFFFFFFFn
}
return hash
}
}
// --- Riak Client ---
class RiakClient {
ring: RiakRing
replicationFactor: number = 3
physicalNodes: Map<string, boolean> = new Map() // nodeId โ alive
stats: { reads: number, writes: number, conflicts: number, repairs: number } =
{ reads: 0, writes: 0, conflicts: 0, repairs: 0 }
constructor(ring: RiakRing, replicationFactor: number = 3) {
this.ring = ring
this.replicationFactor = replicationFactor
// Track physical nodes
for (const vnode of ring.vnodes) {
this.physicalNodes.set(vnode.physicalNode, true)
}
}
// --- Write with Vector Clocks ---
put(key: string, value: string, clientNodeId: string): void {
this.stats.writes++
const prefList = this.ring.findPrefList(key, this.replicationFactor)
// Collect current vector clocks from all replicas
let latestClock = new VectorClock()
let latestTimestamp = 0
for (const vnode of prefList) {
if (!this.physicalNodes.get(vnode.physicalNode)) continue
const existing = vnode.data.get(key)
if (existing) {
if (existing.lastWriteTime > latestTimestamp) {
latestTimestamp = existing.lastWriteTime
}
const cmp = existing.vectorClock.compare(latestClock)
if (cmp === 0 || cmp === -1) {
// Existing clock is concurrent or ancestor, merge
const merged = new VectorClock()
for (const [node, count] of existing.vectorClock.counters) {
const latestCount = latestClock.counters.get(node) || 0
merged.counters.set(node, Math.max(count, latestCount))
}
for (const [node, count] of latestClock.counters) {
if (!merged.counters.has(node)) {
merged.counters.set(node, count)
}
}
latestClock = merged
}
}
}
// Increment the coordinator's counter in the vector clock
latestClock.increment(clientNodeId)
const obj: RiakObject = {
key,
value,
vectorClock: latestClock,
lastWriteTime: Date.now(),
}
// Write to all available replicas (hinted handoff for unavailable ones)
for (const vnode of prefList) {
if (!this.physicalNodes.get(vnode.physicalNode)) {
// Hinted handoff: store hint on the next available vnode
console.log(` โ ๏ธ Hinted handoff for ${key} โ ${vnode.physicalNode} is down`)
const nextAlive = this.ring.vnodes.find(
v => this.physicalNodes.get(v.physicalNode) && v !== vnode
)
if (nextAlive) {
nextAlive.data.set(key, obj)
}
continue
}
vnode.data.set(key, obj)
}
console.log(` โ๏ธ PUT ${key} = "${value}" (client ${clientNodeId}) | clock: ${this.formatClock(latestClock)}`)
}
// --- Read with Sibling Resolution ---
get(key: string): { value: string, isConflict: boolean, siblings?: [string, VectorClock][] } {
this.stats.reads++
const prefList = this.ring.findPrefList(key, this.replicationFactor)
const results: RiakObject[] = []
for (const vnode of prefList) {
if (!this.physicalNodes.get(vnode.physicalNode)) continue
const obj = vnode.data.get(key)
if (obj) {
results.push(obj)
}
}
if (results.length === 0) {
return { value: '<not found>', isConflict: false }
}
// Check for conflicts using vector clock comparison
const byClock = new Map<string, RiakObject[]>()
for (const obj of results) {
const key = this.formatClock(obj.vectorClock)
if (!byClock.has(key)) byClock.set(key, [])
byClock.get(key)!.push(obj)
}
if (byClock.size > 1) {
// Siblings found! Return all for client resolution
this.stats.conflicts++
const siblings: [string, VectorClock][] = results.map(r => [r.value, r.vectorClock])
console.log(` ๐ GET ${key} โ CONFLICT (${siblings.length} siblings)`)
return {
value: `[CONFLICT: ${results.map(r => r.value).join(' vs ')}]`,
isConflict: true,
siblings,
}
}
// No conflict: return the latest (by vector clock or timestamp)
results.sort((a, b) => a.lastWriteTime - b.lastWriteTime)
const latest = results[results.length - 1]
console.log(` ๐ GET ${key} = "${latest.value}" | clock: ${this.formatClock(latest.vectorClock)}`)
// Read repair: if any replica has stale data, update it
for (const obj of results) {
if (obj.lastWriteTime < latest.lastWriteTime) {
this.stats.repairs++
const staleVNode = prefList.find(v =>
this.physicalNodes.get(v.physicalNode) && v.data.get(key) === obj
)
if (staleVNode) {
staleVNode.data.set(key, latest)
console.log(` ๐ง Read repair on ${staleVNode.physicalNode} โ updated to "${latest.value}"`)
}
}
}
return { value: latest.value, isConflict: false }
}
// --- LWW Mode (last-write-wins, no siblings) ---
getLWW(key: string): string {
this.stats.reads++
const prefList = this.ring.findPrefList(key, this.replicationFactor)
let latestValue = '<not found>'
let latestTimestamp = 0
for (const vnode of prefList) {
if (!this.physicalNodes.get(vnode.physicalNode)) continue
const obj = vnode.data.get(key)
if (obj && obj.lastWriteTime > latestTimestamp) {
latestTimestamp = obj.lastWriteTime
latestValue = obj.value
}
}
return latestValue
}
// --- Failure Simulation ---
failNode(nodeId: string) {
this.physicalNodes.set(nodeId, false)
console.log(`\n๐ฅ ${nodeId} FAILED`)
}
recoverNode(nodeId: string) {
this.physicalNodes.set(nodeId, true)
console.log(`\n๐ ${nodeId} RECOVERED โ anti-entropy will repair data`)
}
private formatClock(vc: VectorClock): string {
return `[${[...vc.counters.entries()].map(([k, v]) => `${k}:${v}`).join(', ')}]`
}
printStats() {
console.log('\n๐ Riak Cluster Stats:')
console.log(` Writes: ${this.stats.writes}`)
console.log(` Reads: ${this.stats.reads}`)
console.log(` Conflicts: ${this.stats.conflicts}`)
console.log(` Repairs: ${this.stats.repairs}`)
}
}
// --- Simulation ---
function simulateRiak() {
console.log('=== RIAK KV โ VECTOR CLOCKS & CONSISTENT HASHING ===\n')
// Create vnodes spread across 3 physical nodes
const nodes = ['N1', 'N2', 'N3', 'N4']
const vnodes: VNode[] = []
for (let i = 0; i < 12; i++) {
const physicalNode = nodes[i % nodes.length]
vnodes.push(new VNode(`vnode${i}`, physicalNode))
}
console.log(`Created ${vnodes.length} vnodes across ${nodes.length} physical nodes`)
const ring = new RiakRing(vnodes)
const client = new RiakClient(ring, 3)
// --- Phase 1: Basic writes and reads ---
console.log('\n=== PHASE 1: Basic operations ===')
client.put('user:42', 'Alice', 'N1')
client.put('user:99', 'Bob', 'N2')
const r1 = client.get('user:42')
console.log(` Result: user:42 = "${r1.value}"`)
// --- Phase 2: Vector clock conflicts (concurrent writes) ---
console.log('\n=== PHASE 2: Concurrent writes โ vector clock conflict ===')
// Write from N1 (clock: N1:1)
client.put('cart:100', 'item:A', 'N1')
// Write from N2 without seeing N1's update (clock: N2:1)
// The two clocks are concurrent โ conflict!
client.put('cart:100', 'item:B', 'N2')
const r2 = client.get('cart:100')
console.log(` Result: cart:100 = "${r2.value}"`)
// Client resolves the conflict
if (r2.isConflict && r2.siblings) {
console.log(` ๐ Client chose "item:A" as the winner`)
client.put('cart:100', 'item:A', 'N3')
}
const r3 = client.get('cart:100')
console.log(` After resolution: cart:100 = "${r3.value}"`)
// --- Phase 3: Node failure and hinted handoff ---
console.log('\n=== PHASE 3: Node failure with hinted handoff ===')
client.failNode('N2')
client.put('order:500', 'processing', 'N1')
client.recoverNode('N2')
const r4 = client.get('order:500')
console.log(` After recovery: order:500 = "${r4.value}"`)
// --- Phase 4: Read repair ---
console.log('\n=== PHASE 4: Read repair ===')
// Write to all nodes
client.put('settings:theme', 'dark', 'N1')
// Simulate a stale replica (via direct manipulation of one vnode)
const strayVNode = ring.findPrefList('settings:theme', 3)[1]
strayVNode.data.set('settings:theme', {
key: 'settings:theme',
value: 'light',
vectorClock: new VectorClock([['N0', 0]]),
lastWriteTime: 0,
})
// Read triggers read repair
const r5 = client.get('settings:theme')
console.log(` After read repair: settings:theme = "${r5.value}"`)
console.log(` Stale vnode is now: ${strayVNode.data.get('settings:theme')?.value}`)
// --- Summary ---
console.log('\n=== SUMMARY ===')
client.printStats()
console.log('\nโ
Riak simulation complete.')
}
simulateRiak()
โฌ Download riak.ts