Skip to content

Commit

Permalink
Merge pull request #5 from numbleroot/feature/gokit-logger
Browse files Browse the repository at this point in the history
Start to integrate go-kit's logger
  • Loading branch information
numbleroot authored May 11, 2017
2 parents 1a3c278 + 7dd7456 commit e204b84
Show file tree
Hide file tree
Showing 36 changed files with 2,811 additions and 211 deletions.
6 changes: 3 additions & 3 deletions comm/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package comm

import (
"fmt"
"log"
stdlog "log"
"time"

"crypto/tls"
Expand Down Expand Up @@ -70,7 +70,7 @@ func ReliableSend(conn *tls.Conn, text string, remoteAddr string, tlsConfig *tls
_, err = fmt.Fprintf(conn, "%s\r\n", text)
for err != nil {

log.Printf("[comm.ReliableSend] Sending to node '%s' failed, trying to recover...\n", remoteAddr)
stdlog.Printf("[comm.ReliableSend] Sending to node '%s' failed, trying to recover...\n", remoteAddr)

// Define an error we can deal with.
okError := fmt.Sprintf("write tcp %s->%s: write: broken pipe", conn.LocalAddr().String(), remoteAddr)
Expand All @@ -83,7 +83,7 @@ func ReliableSend(conn *tls.Conn, text string, remoteAddr string, tlsConfig *tls
return fmt.Errorf("could not reestablish connection with '%s': %s", remoteAddr, err.Error())
}

log.Printf("[comm.ReliableSend] Reconnected to '%s'.\n", remoteAddr)
stdlog.Printf("[comm.ReliableSend] Reconnected to '%s'.\n", remoteAddr)

// Resend message.
_, err = fmt.Fprintf(conn, "%s\r\n", text)
Expand Down
6 changes: 3 additions & 3 deletions comm/receiver-sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package comm_test

import (
"fmt"
"log"
stdlog "log"
"testing"
"time"

Expand Down Expand Up @@ -79,7 +79,7 @@ func TestSenderReceiver(t *testing.T) {
updMsg := <-n1ApplyCRDTUpdChan

// Log message.
log.Printf("[comm_test.TestSenderReceiver] %s: Would apply update from message here: %s\n", n1, updMsg)
stdlog.Printf("[comm_test.TestSenderReceiver] %s: Would apply update from message here: %s\n", n1, updMsg)

// Signal success.
n1DoneCRDTUpdChan <- struct{}{}
Expand All @@ -102,7 +102,7 @@ func TestSenderReceiver(t *testing.T) {
updMsg := <-n2ApplyCRDTUpdChan

// Log message.
log.Printf("[comm_test.TestSenderReceiver] %s: Would apply update from message here: %s\n", n2, updMsg)
stdlog.Printf("[comm_test.TestSenderReceiver] %s: Would apply update from message here: %s\n", n2, updMsg)

// Signal success.
n2DoneCRDTUpdChan <- struct{}{}
Expand Down
46 changes: 23 additions & 23 deletions comm/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"bytes"
"fmt"
"io"
"log"
stdlog "log"
"net"
"os"
"strconv"
Expand Down Expand Up @@ -222,7 +222,7 @@ func (recv *Receiver) Shutdown(downRecv chan struct{}) {
// Wait for signal.
<-downRecv

log.Printf("[comm.Shutdown] Receiver: shutting down...\n")
stdlog.Printf("[comm.Shutdown] Receiver: shutting down...\n")

// Instruct other goroutines to shutdown.
recv.shutdown <- struct{}{}
Expand All @@ -243,7 +243,7 @@ func (recv *Receiver) Shutdown(downRecv chan struct{}) {
recv.socket.Close()
recv.lock.Unlock()

log.Printf("[comm.Shutdown] Receiver: done!\n")
stdlog.Printf("[comm.Shutdown] Receiver: done!\n")
}

// IncVClockEntry waits for an incoming name of a node on
Expand Down Expand Up @@ -291,7 +291,7 @@ func (recv *Receiver) IncVClockEntry() {
// Save updated vector clock to log file.
err := recv.SaveVClockEntries()
if err != nil {
log.Fatalf("[comm.IncVClockEntry] Saving updated vector clock to file failed: %s\n", err.Error())
stdlog.Fatalf("[comm.IncVClockEntry] Saving updated vector clock to file failed: %s\n", err.Error())
}

// Send back the updated vector clock on other
Expand Down Expand Up @@ -401,11 +401,11 @@ func (recv *Receiver) StoreIncMsgs(conn net.Conn) {
if err != nil {

if err.Error() == "EOF" {
log.Printf("[comm.StoreIncMsgs] Reading from closed connection. Ignoring.\n")
stdlog.Printf("[comm.StoreIncMsgs] Reading from closed connection. Ignoring.\n")
return
}

log.Fatalf("[comm.StoreIncMsgs] Error while reading sync message: %s\n", err.Error())
stdlog.Fatalf("[comm.StoreIncMsgs] Error while reading sync message: %s\n", err.Error())
}

// Remove trailing characters denoting line end.
Expand All @@ -418,20 +418,20 @@ func (recv *Receiver) StoreIncMsgs(conn net.Conn) {
// Write it to message log file.
_, err = recv.writeLog.WriteString(msgRaw)
if err != nil {
log.Fatalf("[comm.StoreIncMsgs] Writing to CRDT log file failed with: %s\n", err.Error())
stdlog.Fatalf("[comm.StoreIncMsgs] Writing to CRDT log file failed with: %s\n", err.Error())
}

// Append a newline symbol to just written line.
newline := []byte("\n")
_, err = recv.writeLog.Write(newline)
if err != nil {
log.Fatalf("[comm.StoreIncMsgs] Appending a newline symbol to CRDT log file failed with: %s\n", err.Error())
stdlog.Fatalf("[comm.StoreIncMsgs] Appending a newline symbol to CRDT log file failed with: %s\n", err.Error())
}

// Save to stable storage.
err = recv.writeLog.Sync()
if err != nil {
log.Fatalf("[comm.StoreIncMsgs] Syncing CRDT log file to stable storage failed with: %s\n", err.Error())
stdlog.Fatalf("[comm.StoreIncMsgs] Syncing CRDT log file to stable storage failed with: %s\n", err.Error())
}

// Unlock mutex.
Expand Down Expand Up @@ -485,7 +485,7 @@ func (recv *Receiver) ApplyStoredMsgs() {
// http://stackoverflow.com/a/30948278
info, err := recv.updLog.Stat()
if err != nil {
log.Fatalf("[comm.ApplyStoredMsgs] Could not get CRDT log file information: %s\n", err.Error())
stdlog.Fatalf("[comm.ApplyStoredMsgs] Could not get CRDT log file information: %s\n", err.Error())
}

// Store accessed file size for multiple use.
Expand All @@ -501,7 +501,7 @@ func (recv *Receiver) ApplyStoredMsgs() {
// Save current position of head for later use.
curOffset, err := recv.updLog.Seek(0, os.SEEK_CUR)
if err != nil {
log.Fatalf("[comm.ApplyStoredMsgs] Error while retrieving current head position in CRDT log file: %s\n", err.Error())
stdlog.Fatalf("[comm.ApplyStoredMsgs] Error while retrieving current head position in CRDT log file: %s\n", err.Error())
}

// Calculate size of needed buffer.
Expand All @@ -515,7 +515,7 @@ func (recv *Receiver) ApplyStoredMsgs() {
// Reset position to beginning of file.
_, err = recv.updLog.Seek(0, os.SEEK_SET)
if err != nil {
log.Fatalf("[comm.ApplyStoredMsgs] Could not reset position in CRDT log file: %s\n", err.Error())
stdlog.Fatalf("[comm.ApplyStoredMsgs] Could not reset position in CRDT log file: %s\n", err.Error())
}

// Unlock log file mutex.
Expand All @@ -537,13 +537,13 @@ func (recv *Receiver) ApplyStoredMsgs() {
// Copy contents of log file to prepared buffer.
_, err = io.Copy(buf, recv.updLog)
if err != nil {
log.Fatalf("[comm.ApplyStoredMsgs] Could not copy CRDT log file contents to buffer: %s\n", err.Error())
stdlog.Fatalf("[comm.ApplyStoredMsgs] Could not copy CRDT log file contents to buffer: %s\n", err.Error())
}

// Read current message at head position from log file.
msgRaw, err := buf.ReadString('\n')
if (err != nil) && (err != io.EOF) {
log.Fatalf("[comm.ApplyStoredMsgs] Error during extraction of first line in CRDT log file: %s\n", err.Error())
stdlog.Fatalf("[comm.ApplyStoredMsgs] Error during extraction of first line in CRDT log file: %s\n", err.Error())
}

// Save length of just read message for later use.
Expand All @@ -552,7 +552,7 @@ func (recv *Receiver) ApplyStoredMsgs() {
// Parse sync message.
msg, err := Parse(msgRaw)
if err != nil {
log.Fatalf("[comm.ApplyStoredMsgs] Error while parsing sync message: %s\n", err.Error())
stdlog.Fatalf("[comm.ApplyStoredMsgs] Error while parsing sync message: %s\n", err.Error())
}

// Initially, set apply indicator to true. This means,
Expand Down Expand Up @@ -611,51 +611,51 @@ func (recv *Receiver) ApplyStoredMsgs() {
// Save updated vector clock to log file.
err := recv.SaveVClockEntries()
if err != nil {
log.Fatalf("[comm.ApplyStoredMsgs] Saving updated vector clock to file failed: %s\n", err.Error())
stdlog.Fatalf("[comm.ApplyStoredMsgs] Saving updated vector clock to file failed: %s\n", err.Error())
}

// Reset head position to curOffset saved at beginning of loop.
_, err = recv.updLog.Seek(curOffset, os.SEEK_SET)
if err != nil {
log.Fatal(err)
stdlog.Fatal(err)
}

// Copy reduced buffer contents back to current position
// of CRDT log file, effectively deleting the read line.
newNumOfBytes, err := io.Copy(recv.updLog, buf)
if err != nil {
log.Fatalf("[comm.ApplyStoredMsgs] Error during copying buffer contents back to CRDT log file: %s\n", err.Error())
stdlog.Fatalf("[comm.ApplyStoredMsgs] Error during copying buffer contents back to CRDT log file: %s\n", err.Error())
}

// Now, truncate log file size to (curOffset + newNumOfBytes),
// reducing the file size by length of handled message.
err = recv.updLog.Truncate((curOffset + newNumOfBytes))
if err != nil {
log.Fatalf("[comm.ApplyStoredMsgs] Could not truncate CRDT log file: %s\n", err.Error())
stdlog.Fatalf("[comm.ApplyStoredMsgs] Could not truncate CRDT log file: %s\n", err.Error())
}

// Sync changes to stable storage.
err = recv.updLog.Sync()
if err != nil {
log.Fatalf("[comm.ApplyStoredMsgs] Syncing CRDT log file to stable storage failed with: %s\n", err.Error())
stdlog.Fatalf("[comm.ApplyStoredMsgs] Syncing CRDT log file to stable storage failed with: %s\n", err.Error())
}

// Reset position to beginning of file because the
// chances are high that we now can proceed in order
// of CRDT message log file.
_, err = recv.updLog.Seek(0, os.SEEK_SET)
if err != nil {
log.Fatalf("[comm.ApplyStoredMsgs] Could not reset position in CRDT log file: %s\n", err.Error())
stdlog.Fatalf("[comm.ApplyStoredMsgs] Could not reset position in CRDT log file: %s\n", err.Error())
}
} else {

log.Printf("[comm.ApplyStoredMsgs] Message was out of order. Next.\n")
stdlog.Printf("[comm.ApplyStoredMsgs] Message was out of order. Next.\n")

// Set position of head to byte after just read message,
// effectively delaying execution of that message.
_, err = recv.updLog.Seek((curOffset + msgRawLength), os.SEEK_SET)
if err != nil {
log.Fatalf("[comm.ApplyStoredMsgs] Error while moving position in CRDT log file to next line: %s\n", err.Error())
stdlog.Fatalf("[comm.ApplyStoredMsgs] Error while moving position in CRDT log file to next line: %s\n", err.Error())
}
}

Expand Down
Loading

0 comments on commit e204b84

Please sign in to comment.