-
Notifications
You must be signed in to change notification settings - Fork 0
/
connect.py
338 lines (299 loc) · 13.1 KB
/
connect.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
# Tap Note:
# This is the code that connects to Twitch / Youtube and checks for new messages.
# You should not need to modify anything in this file, just use as is.
import requests
import sys
import socket
import re
import random
import time
import os
import json
import concurrent.futures
import traceback
MAX_TIME_TO_WAIT_FOR_LOGIN = 3
YOUTUBE_FETCH_INTERVAL = 1
class Twitch:
re_prog = None
sock = None
partial = b''
login_ok = False
channel = ''
login_timestamp = 0
def twitch_connect(self, channel):
if self.sock: self.sock.close()
self.sock = None
self.partial = b''
self.login_ok = False
self.channel = channel
# Compile regular expression
self.re_prog = re.compile(b'^(?::(?:([^ !\r\n]+)![^ \r\n]*|[^ \r\n]*) )?([^ \r\n]+)(?: ([^:\r\n]*))?(?: :([^\r\n]*))?\r\n', re.MULTILINE)
# Create socket
print('Connecting to Twitch...')
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Attempt to connect socket
self.sock.connect(('irc.chat.twitch.tv', 6667))
# Log in anonymously
user = 'justinfan%i' % random.randint(10000, 99999)
print('Connected to Twitch. Logging in anonymously...')
self.sock.send(('PASS asdf\r\nNICK %s\r\n' % user).encode())
self.sock.settimeout(1.0/60.0)
self.login_timestamp = time.time()
# Attempt to reconnect after a delay
def reconnect(self, delay):
time.sleep(delay)
self.twitch_connect(self.channel)
# Returns a list of irc messages received
def receive_and_parse_data(self):
buffer = b''
while True:
received = b''
try:
received = self.sock.recv(4096)
except socket.timeout:
break
# except OSError as e:
# if e.winerror == 10035:
# # This "error" is expected -- we receive it if timeout is set to zero, and there is no data to read on the socket.
# break
except Exception as e:
print('Unexpected connection error. Reconnecting in a second...', e)
self.reconnect(1)
return []
if not received:
print('Connection closed by Twitch. Reconnecting in 5 seconds...')
self.reconnect(5)
return []
buffer += received
if buffer:
# Prepend unparsed data from previous iterations
if self.partial:
buffer = self.partial + buffer
self.partial = []
# Parse irc messages
res = []
matches = list(self.re_prog.finditer(buffer))
for match in matches:
res.append({
'name': (match.group(1) or b'').decode(errors='replace'),
'command': (match.group(2) or b'').decode(errors='replace'),
'params': list(map(lambda p: p.decode(errors='replace'), (match.group(3) or b'').split(b' '))),
'trailing': (match.group(4) or b'').decode(errors='replace'),
})
# Save any data we couldn't parse for the next iteration
if not matches:
self.partial += buffer
else:
end = matches[-1].end()
if end < len(buffer):
self.partial = buffer[end:]
if matches[0].start() != 0:
# If we get here, we might have missed a message. pepeW
print('either ddarknut fucked up or twitch is bonkers, or both I mean who really knows anything at this point')
return res
return []
def twitch_receive_messages(self):
privmsgs = []
for irc_message in self.receive_and_parse_data():
cmd = irc_message['command']
if cmd == 'PRIVMSG':
privmsgs.append({
'username': irc_message['name'],
'message': irc_message['trailing'],
})
elif cmd == 'PING':
self.sock.send(b'PONG :tmi.twitch.tv\r\n')
elif cmd == '001':
print('Successfully logged in. Joining channel %s.' % self.channel)
self.sock.send(('JOIN #%s\r\n' % self.channel).encode())
self.login_ok = True
elif cmd == 'JOIN':
print('Successfully joined channel %s' % irc_message['params'][0])
elif cmd == 'NOTICE':
print('Server notice:', irc_message['params'], irc_message['trailing'])
elif cmd == '002': continue
elif cmd == '003': continue
elif cmd == '004': continue
elif cmd == '375': continue
elif cmd == '372': continue
elif cmd == '376': continue
elif cmd == '353': continue
elif cmd == '366': continue
else:
print('Unhandled irc message:', irc_message)
if not self.login_ok:
# We are still waiting for the initial login message. If we've waited longer than we should, try to reconnect.
if time.time() - self.login_timestamp > MAX_TIME_TO_WAIT_FOR_LOGIN:
print('No response from Twitch. Reconnecting...')
self.reconnect(0)
return []
return privmsgs
# Thanks to Ottomated for helping with the yt side of things!
class YouTube:
session = None
config = {}
payload = {}
thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=1)
fetch_job = None
next_fetch_time = 0
re_initial_data = re.compile('(?:window\\s*\\[\\s*[\\"\']ytInitialData[\\"\']\\s*\\]|ytInitialData)\\s*=\\s*({.+?})\\s*;')
re_config = re.compile('(?:ytcfg\\s*.set)\\(({.+?})\\)\\s*;')
def get_continuation_token(self, data):
cont = data['continuationContents']['liveChatContinuation']['continuations'][0]
if 'timedContinuationData' in cont:
return cont['timedContinuationData']['continuation']
else:
return cont['invalidationContinuationData']['continuation']
def reconnect(self, delay):
if self.fetch_job and self.fetch_job.running():
if not self.fetch_job.cancel():
print("Waiting for fetch job to finish...")
self.fetch_job.result()
print(f"Retrying in {delay}...")
if self.session: self.session.close()
self.session = None
self.config = {}
self.payload = {}
self.fetch_job = None
self.next_fetch_time = 0
time.sleep(delay)
self.youtube_connect(self.channel_id, self.stream_url)
def youtube_connect(self, channel_id, stream_url=None):
print("Connecting to YouTube...")
self.channel_id = channel_id
self.stream_url = stream_url
# Create http client session
self.session = requests.Session()
# Spoof user agent so yt thinks we're an upstanding browser
self.session.headers['User-Agent'] = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.45 Safari/537.36'
# Add consent cookie to bypass google's consent page
requests.utils.add_dict_to_cookiejar(self.session.cookies, {'CONSENT': 'YES+'})
# Connect using stream_url if provided, otherwise use the channel_id
if stream_url is not None:
live_url = self.stream_url
else:
live_url = f"https://youtube.com/channel/{self.channel_id}/live"
res = self.session.get(live_url)
if res.status_code == 404:
live_url = f"https://youtube.com/c/{self.channel_id}/live"
res = self.session.get(live_url)
if not res.ok:
if stream_url is not None:
print(f"Couldn't load the stream URL ({res.status_code} {res.reason}). Is the stream URL correct? {self.stream_url}")
else:
print(f"Couldn't load livestream page ({res.status_code} {res.reason}). Is the channel ID correct? {self.channel_id}")
time.sleep(5)
exit(1)
livestream_page = res.text
# Find initial data in livestream page
matches = list(self.re_initial_data.finditer(livestream_page))
if len(matches) == 0:
print("Couldn't find initial data in livestream page")
time.sleep(5)
exit(1)
initial_data = json.loads(matches[0].group(1))
# Get continuation token for live chat iframe
iframe_continuation = None
try:
iframe_continuation = initial_data['contents']['twoColumnWatchNextResults']['conversationBar']['liveChatRenderer']['header']['liveChatHeaderRenderer']['viewSelector']['sortFilterSubMenuRenderer']['subMenuItems'][1]['continuation']['reloadContinuationData']['continuation']
except Exception as e:
print(f"Couldn't find the livestream chat. Is the channel not live? url: {live_url}")
time.sleep(5)
exit(1)
# Fetch live chat page
res = self.session.get(f'https://youtube.com/live_chat?continuation={iframe_continuation}')
if not res.ok:
print(f"Couldn't load live chat page ({res.status_code} {res.reason})")
time.sleep(5)
exit(1)
live_chat_page = res.text
# Find initial data in live chat page
matches = list(self.re_initial_data.finditer(live_chat_page))
if len(matches) == 0:
print("Couldn't find initial data in live chat page")
time.sleep(5)
exit(1)
initial_data = json.loads(matches[0].group(1))
# Find config data
matches = list(self.re_config.finditer(live_chat_page))
if len(matches) == 0:
print("Couldn't find config data in live chat page")
time.sleep(5)
exit(1)
self.config = json.loads(matches[0].group(1))
# Create payload object for making live chat requests
token = self.get_continuation_token(initial_data)
self.payload = {
"context": self.config['INNERTUBE_CONTEXT'],
"continuation": token,
"webClientInfo": {
"isDocumentHidden": False
},
}
print("Connected.")
def fetch_messages(self):
payload_bytes = bytes(json.dumps(self.payload), "utf8")
res = self.session.post(f"https://www.youtube.com/youtubei/v1/live_chat/get_live_chat?key={self.config['INNERTUBE_API_KEY']}&prettyPrint=false", payload_bytes)
if not res.ok:
print(f"Failed to fetch messages. {res.status_code} {res.reason}")
print("Body:", res.text)
print("Payload:", payload_bytes)
self.session.close()
self.session = None
return []
try:
data = json.loads(res.text)
self.payload['continuation'] = self.get_continuation_token(data)
cont = data['continuationContents']['liveChatContinuation']
messages = []
if 'actions' in cont:
for action in cont['actions']:
if 'addChatItemAction' in action:
if 'item' in action['addChatItemAction']:
if 'liveChatTextMessageRenderer' in action['addChatItemAction']['item']:
item = action['addChatItemAction']['item']['liveChatTextMessageRenderer']
messages.append({
'author': item['authorName']['simpleText'],
'content': item['message']['runs']
})
return messages
except Exception as e:
print(f"Failed to parse messages.")
print("Body:", res.text)
traceback.print_exc()
return []
def twitch_receive_messages(self):
if self.session == None:
self.reconnect(0)
messages = []
if not self.fetch_job:
time.sleep(1.0/60.0)
if time.time() > self.next_fetch_time:
self.fetch_job = self.thread_pool.submit(self.fetch_messages)
else:
res = []
timed_out = False
try:
res = self.fetch_job.result(1.0/60.0)
except concurrent.futures.TimeoutError:
timed_out = True
except Exception:
traceback.print_exc()
self.session.close()
self.session = None
return
if not timed_out:
self.fetch_job = None
self.next_fetch_time = time.time() + YOUTUBE_FETCH_INTERVAL
for item in res:
msg = {
'username': item['author'],
'message': ''
}
for part in item['content']:
if 'text' in part:
msg['message'] += part['text']
elif 'emoji' in part:
msg['message'] += part['emoji']['emojiId']
messages.append(msg)
return messages