-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.js
144 lines (125 loc) · 5.53 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import * as IPFS from 'ipfs-core'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
const data = 'Hello, World!'
const fragmentCount = 10
const nodeCount = 20
const shardSize = 1000
const shardSizeBytes = shardSize * 1024
const shardCount = Math.ceil(data.length / shardSizeBytes)
const shards = []
const pubsubTopic = 'sharding'
async function main() {
const nodes = []
try {
for (let i = 0; i < nodeCount; i++) {
const node = await IPFS.create({
repo: `./node${i}`,
config: {
Addresses: {
Swarm: []
},
relay: {
enabled: true, // enable relay dialer/listener (STOP)
hop: {
enabled: true // make this node a relay (HOP)
}
},
pubsub: {
enabled: true // enable pubsub
}
}
})
nodes.push(node)
console.log(`Node ${i} started`)
}
// Subscribe to pubsub topic
nodes.forEach(async (node) => {
await node.pubsub.subscribe(pubsubTopic, (msg) => {
console.log(`Received message from node ${msg.from}: ${msg.data.toString()}`)
})
})
const shardedData = []
for (let i = 0; i < shardCount; i++) {
shardedData.push(uint8ArrayFromString(`shard-${i}`))
}
const shards = []
// Add redundancy by splitting the shards into fragments and storing each fragment in a different node
for (let i = 0; i < shardCount; i++) {
const shard = shardedData[i]
const shardSize = shard.byteLength
let offset = 0
const fragments = []
while (offset < shardSize) {
const chunkSize = Math.min(shardSizeBytes, shardSize - offset)
const chunk = shard.subarray(offset, offset + chunkSize)
const fragment = { data: chunk, nodes: [] }
for (let j = 0; j < fragmentCount; j++) {
const nodeIndex = (i * fragmentCount + j) % nodeCount
const node = nodes[nodeIndex]
let cid
try { cid = await node.add(chunk) } catch (err) { return console.error(`Error adding chunk to node ${nodeIndex}`, err) }
fragment.nodes.push({ nodeIndex, cid })
}
fragments.push(fragment)
offset += chunkSize
}
// Publish message to pubsub topic with shard information
const shardInfo = { shardIndex: i, fragments }
const msg = uint8ArrayFromString(JSON.stringify(shardInfo))
await nodes.forEach(async (node) => { await node.pubsub.publish(pubsubTopic, msg) })
shards.push(shardInfo)
}
// Download and reassemble shards from nodes
async function downloadAndReassemble(shardInfo) {
const { fragments } = shardInfo
const reassembledShard = new Uint8Array(shardSizeBytes)
for (let i = 0; i < fragments.length; i++) {
const fragment = fragments[i]
const { data: fragmentData, nodes } = fragment
let reassembledOffset = 0
for (let j = 0; j < nodes.length; j++) {
const { nodeIndex, cid } = nodes[j]
const node = nodes[nodeIndex]
try {
const chunk = await node.get(cid)
reassembledShard.set(chunk, reassembledOffset)
reassembledOffset += chunk.byteLength
} catch (err) {
return console.error(`Error getting chunk from node ${nodeIndex}`, err)
}
}
if (!reassembledShard.every((byte, index) => byte === fragmentData[index])) {
return console.error(`Shard ${shardInfo.shardIndex} failed verification`)
}
}
console.log(`Shard ${shardInfo.shardIndex} reassembled successfully`)
return reassembledShard
}
// Download and reassemble all shards
async function downloadAndReassembleAll(shards) {
const reassembledData = new Uint8Array(data.length)
for (let i = 0; i < shards.length; i++) {
const shardInfo = shards[i]
const reassembledShard = await downloadAndReassemble(shardInfo)
if (reassembledShard) {
const shardStartIndex = shardInfo.shardIndex * shardSizeBytes
reassembledData.set(reassembledShard, shardStartIndex)
} else {
return console.error(`Failed to reassemble shard ${shardInfo.shardIndex}`)
}
}
console.log(`Data reassembled successfully`)
console.log(`Data: ${uint8ArrayToString(reassembledData)}`)
return reassembledData
}
// Convert Uint8Array to string
function uint8ArrayToString(data) {
return new TextDecoder().decode(data)
}
console.log(`Sharded ${data.length} bytes into ${shards.length} shards, each consisting of ${fragmentCount} fragments, and stored them across ${nodeCount} nodes`)
console.log(shards)
} catch (err) {
return console.error('Error creating IPFS nodes', err)
}
}
main()