-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsubscriptions.go
311 lines (270 loc) · 8.69 KB
/
subscriptions.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
package twitchwh
import (
"bytes"
"encoding/json"
"io"
"net/http"
"time"
)
// Condition for subscription. Empty values will be omitted. Fill out the options applicable to your subscription type
type Condition struct {
// broadcaster_user_id
BroadcasterUserID string `json:"broadcaster_user_id,omitempty"`
// moderator_user_id
ModeratorUserID string `json:"moderator_user_id,omitempty"`
// user_id
UserID string `json:"user_id,omitempty"`
// from_broadcaster_id
FromBroadcasterUserID string `json:"from_broadcaster_user_id,omitempty"`
// to_broadcaster_id
ToBroadcasterUserID string `json:"to_broadcaster_user_id,omitempty"`
// reward_id
//
// This should be int/string depending on subscription type
RewardID any `json:"reward_id,omitempty"`
// client_id
ClientID string `json:"client_id,omitempty"`
// extension_client_id
ExtensionClientID string `json:"extension_client_id,omitempty"`
// conduit_id
ConduitID string `json:"conduit_id,omitempty"`
// organization_id
OrganizationID string `json:"organization_id,omitempty"`
// category_id
CategoryID string `json:"category_id,omitempty"`
// campaign_id
CampaignID string `json:"campaign_id,omitempty"`
}
type Subscription struct {
ID string `json:"id"`
Status string `json:"status"`
Type string `json:"type"`
Version string `json:"version"`
Cost int `json:"cost"`
// PLEASE NOTE that this will DEFAULT all unused conditions. Check the Type and get the correct condition for that type.
Condition Condition `json:"condition"`
Transport struct {
Method string `json:"method"`
Callback string `json:"callback"`
} `json:"transport"`
CreatedAt time.Time `json:"created_at"`
}
type transport struct {
Method string `json:"method"`
Callback string `json:"callback"`
Secret string `json:"secret"`
}
type subscriptionRequest struct {
Type string `json:"type"`
Version string `json:"version"`
Condition Condition `json:"condition"`
Transport transport `json:"transport"`
}
// AddSubscription attemps to create a new subscription based on the type, version, and condition.
// You can find all subscription types, versions, and conditions at: [EventSub subscription types].
// It will block until Twitch sends the verification request, or timeout after 10 seconds.
//
// !! AddSubscription should only be called AFTER [twitchwh.Client.Handler] is set up accordingly. !!
//
// // Setup the HTTP event handler
// http.HandleFunc("/eventsub", client.Handler)
// go http.ListenAndServe(":8080", nil)
//
// _ := client.AddSubscription("stream.online", "1", twitchwh.Condition{
// BroadcasterUserID: "215185844",
// })
//
// [EventSub subscription types]: https://dev.twitch.tv/docs/eventsub/eventsub-subscription-types/
func (c *Client) AddSubscription(Type string, version string, condition Condition) error {
reqBody, err := json.Marshal(subscriptionRequest{
Type: Type,
Version: version,
Condition: condition,
Transport: transport{
Method: "webhook",
Callback: c.webhookURL,
Secret: c.webhookSecret,
},
})
if err != nil {
return &InternalError{"Could not serialize request body to JSON", err}
}
request, err := http.NewRequest("POST", helixURL+"/eventsub/subscriptions", bytes.NewBuffer(reqBody))
if err != nil {
return &InternalError{"Could not create request", err}
}
request.Header.Set("Content-Type", "application/json")
request.Header.Set("Client-ID", c.clientID)
request.Header.Set("Authorization", "Bearer "+c.token)
res, err := c.httpClient.Do(request)
if err != nil {
return &InternalError{"Could not send request", err}
}
defer res.Body.Close()
body, err := io.ReadAll(res.Body)
if err != nil {
return &InternalError{"Could not read response body", err}
}
if res.StatusCode == 409 {
return &DuplicateSubscriptionError{
Condition: condition,
Type: Type,
}
}
if res.StatusCode != 202 {
return &UnhandledStatusError{res.StatusCode, body}
}
var responseBody struct {
Data []Subscription `json:"data"`
}
err = json.Unmarshal(body, &responseBody)
if err != nil {
return &InternalError{"Could not parse response body", err}
}
// Returned body is an array that contains a single subscription
if len(responseBody.Data) < 1 {
return &InternalError{"Helix did not return the subscription they were supposed to", nil}
}
subscription := responseBody.Data[0]
// Await confirmation
for {
select {
case id := <-c.verifiedSubscriptions:
if id == subscription.ID {
c.logger.Printf("Subscription created: %s", subscription.ID)
return nil
} else {
// Verified subscription was not for this subscription
c.logger.Println("Subscription confirmation did not match ID, ignoring...")
c.verifiedSubscriptions <- id
continue
}
case <-time.After(10 * time.Second):
return &VerificationTimeoutError{subscription}
}
}
}
// RemoveSubscription attempts to remove a subscription based on the ID.
func (c *Client) RemoveSubscription(id string) error {
url := "/eventsub/subscriptions?id=" + id
res, err := c.genericRequest("DELETE", url)
if err != nil {
return &InternalError{"Could not make request", err}
}
if res.StatusCode == 204 {
return nil
}
if res.StatusCode == 401 {
return &UnauthorizedError{}
}
if res.StatusCode == 404 {
return &SubscriptionNotFoundError{}
}
defer res.Body.Close()
body, err := io.ReadAll(res.Body)
if err != nil {
return &InternalError{"Could not read response body", err}
}
return &UnhandledStatusError{
Status: res.StatusCode,
Body: body,
}
}
// RemoveSubscriptionByType attempts to remove a subscription based on the type and condition.
//
// If no subscriptions are found, it will return nil.
//
// Note: This will remove ALL subscriptions that match the provided type and condition.
func (c *Client) RemoveSubscriptionByType(Type string, condition Condition) error {
subs, err := c.GetSubscriptionsByType(Type)
if err != nil {
return err
}
for _, sub := range subs {
// Both of these conditions have unused fields, but since they are both defaulted and of the same type it should be fine
if sub.Condition == condition {
c.logger.Printf("Removing subscription %s", sub.ID)
err := c.RemoveSubscription(sub.ID)
if err != nil {
return err
}
}
}
return nil
}
// Internal function to fetch subscriptions using the provided URL parameters.
// Used by wrapper functions.
// Automatically handles pagination.
func (c *Client) fetchSubscriptions(urlParams string) (subscriptions []Subscription, err error) {
page := 1
cursor := ""
for {
c.logger.Printf("Fetching page %d of subscriptions", page)
page++
var params string
if cursor != "" {
if urlParams == "" {
params = urlParams + "?after=" + cursor
} else {
params = "&after=" + cursor
}
}
res, err := c.genericRequest("GET", "/eventsub/subscriptions"+params)
if err != nil {
return nil, &InternalError{"Could not make request", err}
}
defer res.Body.Close()
body, err := io.ReadAll(res.Body)
if err != nil {
return nil, &InternalError{"Could not read response body", err}
}
if res.StatusCode == 401 {
return nil, &UnauthorizedError{body}
}
if res.StatusCode != 200 {
return nil, &UnhandledStatusError{res.StatusCode, body}
}
var responseStruct struct {
Data []Subscription `json:"data"`
Pagination struct {
Cursor string `json:"cursor"`
} `json:"pagination"`
}
err = json.Unmarshal(body, &responseStruct)
if err != nil {
return nil, &InternalError{"Could not parse response body", err}
}
subscriptions = append(subscriptions, responseStruct.Data...)
if responseStruct.Pagination.Cursor == "" {
// No more subscriptions to fetch
break
}
cursor = responseStruct.Pagination.Cursor
}
return subscriptions, nil
}
// GetSubscriptions retrieves all subscriptions, including revoked ones.
// Automatically handles pagination.
//
// Returns subscriptions and an error (if any).
func (c *Client) GetSubscriptions() (subscriptions []Subscription, err error) {
urlParams := ""
return c.fetchSubscriptions(urlParams)
}
// Get all subscriptions that match the provided type (eg. "stream.online").
// Automatically handles pagination.
//
// Returns subscriptions and an error (if any).
func (c *Client) GetSubscriptionsByType(Type string) (subscriptions []Subscription, err error) {
urlParams := "?type=" + Type
return c.fetchSubscriptions(urlParams)
}
// Get all subscriptions with the provided status.
// For a list of all status types see: https://dev.twitch.tv/docs/api/reference/#get-eventsub-subscriptions .
// Automatically handles pagination.
//
// Returns subscriptions and an error (if any).
func (c *Client) GetSubscriptionsByStatus(status string) (subscriptions []Subscription, err error) {
urlParams := "?status=" + status
return c.fetchSubscriptions(urlParams)
}