Skip to content

Commit

Permalink
Write into PostgreSQL, implemented mutex and added unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
arturogonzalezm committed Jul 30, 2024
1 parent b67b87a commit e03479a
Show file tree
Hide file tree
Showing 5 changed files with 257 additions and 49 deletions.
55 changes: 38 additions & 17 deletions cmd/monitor/main.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
package main

import (
"database/sql"
"fmt"
"log"
"os"
"os/signal"
"path/filepath"
"syscall"
"time"

"github.com/arturogonzalezm/RealTimeBinanceMonitor/internal/processor"
"github.com/arturogonzalezm/RealTimeBinanceMonitor/internal/websocket"
_ "github.com/lib/pq"
)

func main() {
Expand All @@ -29,28 +30,48 @@ func main() {
}
}()

// Create data directory
currentDir, err := os.Getwd()
if err != nil {
log.Fatal("Error getting current directory:", err)
}
dataDir := filepath.Join(currentDir, "data")
if err := os.MkdirAll(dataDir, os.ModePerm); err != nil {
log.Fatal("Error creating directory:", err)
// Get database connection details from environment variables
dbHost := os.Getenv("DB_HOST")
dbUser := os.Getenv("DB_USER")
dbPassword := os.Getenv("DB_PASSWORD")
dbName := os.Getenv("DB_NAME")

fmt.Printf("DB_HOST: %s, DB_USER: %s, DB_PASSWORD: %s, DB_NAME: %s\n", dbHost, dbUser, dbPassword, dbName)

// Database connection setup
connStr := fmt.Sprintf("postgres://%s:%s@%s/%s?sslmode=disable", dbUser, dbPassword, dbHost, dbName)
var db *sql.DB
var err error

// Retry connecting to the database until successful
for {
db, err = sql.Open("postgres", connStr)
if err != nil {
log.Printf("Error connecting to the database: %v", err)
} else if err = db.Ping(); err == nil {
break
}
log.Println("Waiting for the database to be ready...")
time.Sleep(2 * time.Second)
}
defer func() {
if err := db.Close(); err != nil {
log.Printf("Error closing the database: %v", err)
}
}()

// Create BufferedCSVWriter with a 5-second flush interval
csvWriter, err := processor.NewBufferedCSVWriter(filepath.Join(dataDir, fmt.Sprintf("%s_data.csv", symbol)), 100, 5*time.Second)
// Create PGWriter
pgWriter, err := processor.NewPGWriter(db)
if err != nil {
log.Fatal("Error creating CSV writer:", err)
log.Fatal("Error creating PostgreSQL writer:", err)
}
defer func() {
if err := csvWriter.Close(); err != nil {
log.Printf("Error closing CSV writer: %v", err)
if err := pgWriter.Close(); err != nil {
log.Printf("Error closing PostgreSQL writer: %v", err)
}
}()

client.AddProcessor(csvWriter)
client.AddProcessor(pgWriter)

stop := make(chan struct{})
go client.Listen(stop)
Expand All @@ -73,8 +94,8 @@ func main() {
return
case <-ticker.C:
// Print a summary every 5 seconds
log.Printf("Last 5 seconds: Processed %d messages", csvWriter.GetProcessedCount())
log.Printf("Current buffer size: %d", csvWriter.GetBufferSize())
log.Printf("Last 5 seconds: Processed %d messages", pgWriter.GetProcessedCount())
log.Printf("Current buffer size: %d", pgWriter.GetBufferSize())
}
}
}
65 changes: 39 additions & 26 deletions database/init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,43 +4,37 @@
DO
$do$
BEGIN
IF NOT EXISTS (
SELECT
FROM pg_catalog.pg_roles
WHERE rolname = 'postgres') THEN

IF NOT EXISTS (SELECT
FROM pg_catalog.pg_roles
WHERE rolname = 'postgres') THEN
CREATE ROLE postgres;
END IF;
END
$do$;
$do$;

-- Create a new database if it does not exist
DO
$do$
BEGIN
IF NOT EXISTS (
SELECT
FROM pg_database
WHERE datname = 'postgres') THEN

IF NOT EXISTS (SELECT
FROM pg_database
WHERE datname = 'postgres') THEN
CREATE DATABASE postgres;
END IF;
END
$do$;
$do$;

-- Create a new user with a password if it does not exist
DO
$do$
BEGIN
IF NOT EXISTS (
SELECT
FROM pg_catalog.pg_roles
WHERE rolname = 'postgres') THEN

IF NOT EXISTS (SELECT
FROM pg_catalog.pg_roles
WHERE rolname = 'postgres') THEN
CREATE USER postgres WITH ENCRYPTED PASSWORD 'postgres';
END IF;
END
$do$;
$do$;

-- Grant all privileges on the new database to the new user
GRANT ALL PRIVILEGES ON DATABASE postgres TO postgres;
Expand All @@ -49,13 +43,32 @@ GRANT ALL PRIVILEGES ON DATABASE postgres TO postgres;
\c postgres

-- Create the exchange_info table
CREATE TABLE IF NOT EXISTS exchange_info (
symbol VARCHAR(50),
status VARCHAR(50),
base_asset VARCHAR(50),
quote_asset VARCHAR(50),
filter_type VARCHAR(50),
filter_key VARCHAR(50),
CREATE TABLE IF NOT EXISTS exchange_info
(
symbol VARCHAR(50),
status VARCHAR(50),
base_asset VARCHAR(50),
quote_asset VARCHAR(50),
filter_type VARCHAR(50),
filter_key VARCHAR(50),
filter_value VARCHAR(50),
CONSTRAINT exchange_info_unique UNIQUE (symbol, filter_type, filter_key)
);
);

CREATE TABLE IF NOT EXISTS ticker_data
(
id SERIAL PRIMARY KEY,
event_time BIGINT,
symbol TEXT,
last_price DOUBLE PRECISION,
price_change DOUBLE PRECISION,
high_price DOUBLE PRECISION,
low_price DOUBLE PRECISION,
volume DOUBLE PRECISION,
quote_volume DOUBLE PRECISION,
open_time BIGINT,
close_time BIGINT,
trade_count INT,
latency BIGINT
);

8 changes: 2 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,11 @@ require (
)

require (
github.com/adshao/go-binance/v2 v2.6.0 // indirect
github.com/bitly/go-simplejson v0.5.0 // indirect
github.com/DATA-DOG/go-sqlmock v1.5.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/lib/pq v1.10.9 // indirect
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace RealTimeBinanceMonitor => github.com/arturogonzalezm/RealTimeBinanceMonitor v1.0.0
replace github.com/arturogonzalezm/RealTimeBinanceMonitor v1.0.0 => github.com/arturogonzalezm/RealTimeBinanceMonitor v1.1.0
61 changes: 61 additions & 0 deletions internal/processor/pgwriter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package processor

import (
"database/sql"
"fmt"
"sync"

"github.com/arturogonzalezm/RealTimeBinanceMonitor/internal/models"
)

// PGWriter implements DataProcessor interface for PostgreSQL
type PGWriter struct {
db *sql.DB
mutex sync.Mutex
processedCount int
}

// NewPGWriter creates a new PGWriter
func NewPGWriter(db *sql.DB) (*PGWriter, error) {
writer := &PGWriter{
db: db,
}

return writer, nil
}

// Process implements the DataProcessor interface
func (w *PGWriter) Process(data models.FormattedData) {
w.mutex.Lock()
defer w.mutex.Unlock()

_, err := w.db.Exec(`INSERT INTO ticker_data (
event_time, symbol, last_price, price_change, high_price, low_price, volume, quote_volume, open_time, close_time, trade_count, latency
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)`,
data.EventTime, data.Symbol, data.LastPrice, data.PriceChange, data.HighPrice, data.LowPrice,
data.Volume, data.QuoteVolume, data.OpenTime, data.CloseTime, data.TradeCount, data.Latency,
)
if err != nil {
fmt.Printf("Error inserting data: %v\n", err)
return
}

w.processedCount++
}

// GetProcessedCount returns the number of processed messages
func (w *PGWriter) GetProcessedCount() int {
w.mutex.Lock()
defer w.mutex.Unlock()
return w.processedCount
}

// GetBufferSize returns the current size of the buffer (always 0 for immediate insert)
func (w *PGWriter) GetBufferSize() int {
return 0
}

// Close closes the PostgreSQL connection
func (w *PGWriter) Close() error {
return w.db.Close()
}
117 changes: 117 additions & 0 deletions internal/processor/pgwriter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package processor

import (
"testing"

"github.com/DATA-DOG/go-sqlmock"
"github.com/arturogonzalezm/RealTimeBinanceMonitor/internal/models"
"github.com/stretchr/testify/assert"
)

func TestNewPGWriter(t *testing.T) {
db, mock, err := sqlmock.New()
assert.NoError(t, err)
defer func() {
mock.ExpectClose()
err := db.Close()
assert.NoError(t, err)
assert.NoError(t, mock.ExpectationsWereMet())
}()

writer, err := NewPGWriter(db)
assert.NoError(t, err)
assert.NotNil(t, writer)
}

func TestPGWriter_Process(t *testing.T) {
db, mock, err := sqlmock.New()
assert.NoError(t, err)
defer func() {
mock.ExpectClose()
err := db.Close()
assert.NoError(t, err)
assert.NoError(t, mock.ExpectationsWereMet())
}()

writer, err := NewPGWriter(db)
assert.NoError(t, err)
assert.NotNil(t, writer)

data := models.FormattedData{
EventTime: 1625097600000,
Symbol: "btcusdt",
LastPrice: 34000.0,
PriceChange: 100.0,
HighPrice: 34500.0,
LowPrice: 33500.0,
Volume: 100.0,
QuoteVolume: 3400000.0,
OpenTime: 1625094000000,
CloseTime: 1625097600000,
TradeCount: 1000,
Latency: 100,
}

mock.ExpectExec(`INSERT INTO ticker_data`).
WithArgs(
data.EventTime, data.Symbol, data.LastPrice, data.PriceChange, data.HighPrice, data.LowPrice,
data.Volume, data.QuoteVolume, data.OpenTime, data.CloseTime, data.TradeCount, data.Latency,
).
WillReturnResult(sqlmock.NewResult(1, 1))

writer.Process(data)

assert.Equal(t, 1, writer.GetProcessedCount())
}

func TestPGWriter_GetProcessedCount(t *testing.T) {
db, mock, err := sqlmock.New()
assert.NoError(t, err)
defer func() {
mock.ExpectClose()
err := db.Close()
assert.NoError(t, err)
assert.NoError(t, mock.ExpectationsWereMet())
}()

writer, err := NewPGWriter(db)
assert.NoError(t, err)
assert.NotNil(t, writer)

assert.Equal(t, 0, writer.GetProcessedCount())
}

func TestPGWriter_GetBufferSize(t *testing.T) {
db, mock, err := sqlmock.New()
assert.NoError(t, err)
defer func() {
mock.ExpectClose()
err := db.Close()
assert.NoError(t, err)
assert.NoError(t, mock.ExpectationsWereMet())
}()

writer, err := NewPGWriter(db)
assert.NoError(t, err)
assert.NotNil(t, writer)

assert.Equal(t, 0, writer.GetBufferSize())
}

func TestPGWriter_Close(t *testing.T) {
db, mock, err := sqlmock.New()
assert.NoError(t, err)
defer func() {
assert.NoError(t, mock.ExpectationsWereMet())
}()

writer, err := NewPGWriter(db)
assert.NoError(t, err)
assert.NotNil(t, writer)

mock.ExpectClose()

err = writer.Close()
assert.NoError(t, err)
assert.NoError(t, mock.ExpectationsWereMet())
}

0 comments on commit e03479a

Please sign in to comment.