-
Notifications
You must be signed in to change notification settings - Fork 1
/
ohaio_bot.py
55 lines (39 loc) · 1.41 KB
/
ohaio_bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import tweepy
import pytz
import random
import ohaio_markov
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.triggers.cron import CronTrigger
from apscheduler.executors.pool import ThreadPoolExecutor
io_timezone = pytz.timezone("America/Toronto")
corpus_filename = "morning_dataset.txt"
jobstores = {
'default': MemoryJobStore()
}
executors = {
'default': ThreadPoolExecutor(1)
}
job_defaults = {
'coalese': False
}
scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=io_timezone)
def tweet(corpus_data):
import tokens
client = tweepy.Client(
consumer_key=tokens.CONSUMER_KEY, consumer_secret=tokens.CONSUMER_SECRET,
access_token=tokens.ACCESS_TOKEN, access_token_secret=tokens.ACCESS_SECRET
)
tweetText = ohaio_markov.generate_tweet(corpus_data)
print("Tweeting {}\n".format(tweetText))
client.create_tweet(text=tweetText)
def test_tweet():
corpus_data = ohaio_markov.load_corpus(corpus_filename)
tweet(corpus_data)
if __name__ == "__main__":
print("OHAAAAAIIIIOOOOO!")
corpus_data = ohaio_markov.load_corpus(corpus_filename)
ct = CronTrigger(hour=8, minute=0, second=0, jitter=60*30)
scheduler.add_job(tweet, trigger=ct, args=(corpus_data,))
print("Starting scheduler!")
scheduler.start()