-
Notifications
You must be signed in to change notification settings - Fork 0
/
producer.py
93 lines (72 loc) · 3.16 KB
/
producer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
from confluent_kafka import Producer
import yfinance as yf
import time
import requests
import json
from datetime import datetime
# Kafka producer configuration
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3',
'Content-Type': 'application/json',
'Authorization': 'Bearer <token>'
}
conf = {
'bootstrap.servers': 'localhost:9092',
'client.id': 'stock-price-producer'
}
# Create a Kafka producer instance
producer = Producer(conf)
# Kafka topic to send stock price data
topic = 'amzn-events'
# Ticker symbol of the stock (e.g., Apple Inc.)
ticker_symbol = 'AMZN'
# Function to fetch stock price and send to Kafka
def fetch_and_send_stock_price():
while True:
try:
url = 'https://query2.finance.yahoo.com/v8/finance/chart/amzn'
response = requests.get(url, headers=headers)
data = json.loads(response.text)
# Extract additional data points
result = data["chart"]["result"][0]["meta"]
price = result["regularMarketPrice"]
symbol = result["symbol"]
currency = result["currency"]
previousClose = result["previousClose"]
volume = result["regularMarketVolume"]
high = result["regularMarketDayHigh"]
low = result["regularMarketDayLow"]
# Extract adjusted close price
timestamps = data["chart"]["result"][0]["timestamp"]
indicators = data["chart"]["result"][0]["indicators"]["quote"][0]
adj_close_prices = indicators["close"]
# Extract the list of open prices
open_price = indicators["open"][0]
# Get the most recent adjusted close price
adj_close = adj_close_prices[-1] if adj_close_prices else None
# Get the current timestamp
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Combine data into a dictionary excluding recommendations and news
combined_data = {
"timestamp":timestamp,
"symbol": symbol,
"price": price,
"currency": currency,
"previousClose": previousClose,
"volume": volume,
"high": high,
"low": low,
"open_price": open_price,
"adj_close": adj_close
}
# Produce the combined data to the Kafka topic
producer.produce(topic, key=ticker_symbol, value=json.dumps(combined_data))
producer.flush()
# Print the message with timestamp
print(f"{timestamp} - Sent {ticker_symbol} data to Kafka: {price,currency,previousClose,volume,high,low, open_price,adj_close}")
except Exception as e:
print(f"Error fetching/sending stock price: {e}")
# Sleep for a specified interval (e.g., 5 seconds) before fetching the next price
time.sleep(5)
# Start sending stock price data
fetch_and_send_stock_price()