-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsweeper.js
138 lines (115 loc) · 4.2 KB
/
sweeper.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
const moment = require('moment')
const { PromisePool } = require('@supercharge/promise-pool')
const { groupBy, prop, sortBy, uniq, flatten } = require('ramda')
const airCourtsWrapper = require('./index')
const ddb = require('./aws/ddb')
const { s3, BUCKET_NAME } = require('./aws/s3')
const airCourtsCoimbraClubs = require('./clubs-coimbra.json')
const { getSubscribedDates } = require('./notifier')
const dateUtils = require('./date-utils')
const { NUMBER_OF_DAYS_SWEEP } = require('./config')
const coimbraClubIds = airCourtsCoimbraClubs.map((club) => club.id)
const sweep = async ({ weekDate, startTime } = {}) => {
const { validDates } = await getSubscribedDates()
const nextWeekDays = dateUtils.next(NUMBER_OF_DAYS_SWEEP, weekDate)
const weekDays = uniq(flatten([validDates, nextWeekDays])).sort()
await processAvailabilities(weekDays, startTime)
}
const processAvailabilities = async (weekDays, startTime) => {
const availabilities = await airCourtsWrapper.getClubsAvailability({
clubIds: coimbraClubIds,
weekDays,
startTime: startTime,
sport: 4 // Padel
})
const courtsById = availabilities?.courtsById || []
const slotsById = availabilities?.slotsById || []
const slotIds = Object.keys(slotsById)
const upsertData = slotIds.map(slotId => {
const slot = slotsById[slotId];
const court = courtsById[slot.courtId]
const timestamp = `${slot.date} ${slot.start}`
const timestampMoment = moment(timestamp)
return {
slot_id: slot.id,
start_date: slot.date,
timestamp: timestamp,
ttl: timestampMoment.clone().add(6, 'day').toDate().getTime(),
data: {
court: {
id: court.id,
name: court.name,
clubId: court.clubId,
clubName: court.clubName
}
}
}
})
const sortedUpsertData = sortBy(prop('ttl'))(upsertData)
//await sendToDDB(upsertData, availabilities)
await sendToS3(sortedUpsertData, availabilities)
}
const sendToDDB = async (upsertData, availabilities) => {
return bulkUpsert(upsertData)
}
const bulkUpsert = async (dataToUpsert) => {
const tableName = 'slots';
const batchSize = 25; // DynamoDB limit is 25
// Chunk the data into batches
const dataBatches = [];
for (let i = 0; i < dataToUpsert.length; i += batchSize) {
const batch = dataToUpsert.slice(i, i + batchSize);
dataBatches.push(batch);
}
// Perform batch upserts with concurrency of 1 (serial execution)
const upsertParams = dataBatches.map(batch => {
const batchWriteParams = {
RequestItems: {
[tableName]: batch.map(item => ({
PutRequest: {
Item: item,
},
})),
},
};
return batchWriteParams
});
return PromisePool
.withConcurrency(1)
.for(upsertParams)
.process(async (batch) => {
return ddb.batchWrite(batch).promise();
})
}
const sendToS3 = async (upsertData, availabilities) => {
const dataByDate = groupBy(prop('start_date'))(upsertData)
return batchWriteJsonToS3(dataByDate, BUCKET_NAME)
}
const batchWriteJsonToS3 = async (jsonObject, bucketName) => {
// Iterate through each date key in the JSON object
const upsertParams = Object.keys(jsonObject).map(date => {
const jsonContent = jsonObject[date];
const fileName = `data/${date}.json`;
// Convert the JSON content to a string
const jsonString = JSON.stringify(jsonContent);
// Define the S3 object parameters
const s3Params = {
Bucket: bucketName,
Key: fileName,
Body: jsonString,
ContentType: 'application/json',
};
return s3Params
})
return PromisePool
.withConcurrency(3)
.for(upsertParams)
.process(async (s3Params) => {
// Upload the JSON content to S3
await s3.upload(s3Params).promise();
console.log(`Uploaded ${s3Params.Key} to S3`);
})
}
module.exports = {
sweep
}