Skip to content

Commit

Permalink
Revert #21 (#30)
Browse files Browse the repository at this point in the history
* Revert "Guard against negative index accesses for `offsets` slice"

This reverts commit 6d6400a.

* Revert "Shift the `offsets` and `partitionRefs` in case of database truncation (#21)"

This reverts commit 1269ce6.

* Clean up rest of the lines
  • Loading branch information
M. Mert Yıldıran authored Mar 26, 2022
1 parent eef5bf2 commit 785f306
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 49 deletions.
83 changes: 36 additions & 47 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,6 @@ var cdl CoreDumpLock
//
// removedOffsetsCounter is the counter of how many offsets are removed through size limiting.
//
// metaOffsetsLength is the length of offsets by ignoring size limiting for indexing.
//
// macros is the map of strings where the key is the macro and value is the expanded form.
//
// insertionFilter is the filter that's applied just before the insertion of every individual record.
Expand All @@ -160,8 +158,7 @@ type ConcurrentSliceV0 struct {
partitionIndex int64
partitionSizeLimit int64
truncatedTimestamp int64
removedOffsetsCounter uint64
metaOffsetsLength uint64
removedOffsetsCounter int
macros map[string]string
insertionFilter string
insertionFilterExpr *basenine.Expression
Expand All @@ -177,8 +174,7 @@ type ConcurrentSliceV0Export struct {
PartitionIndex int64
PartitionSizeLimit int64
TruncatedTimestamp int64
RemovedOffsetsCounter uint64
MetaOffsetsLength uint64
RemovedOffsetsCounter int
Macros map[string]string
InsertionFilter string
}
Expand Down Expand Up @@ -355,7 +351,6 @@ func dumpCore(silent bool, dontLock bool) (err error) {
csExport.PartitionSizeLimit = cs.partitionSizeLimit
csExport.TruncatedTimestamp = cs.truncatedTimestamp
csExport.RemovedOffsetsCounter = cs.removedOffsetsCounter
csExport.MetaOffsetsLength = cs.metaOffsetsLength
csExport.Macros = cs.macros
csExport.InsertionFilter = cs.insertionFilter
if !dontLock {
Expand Down Expand Up @@ -421,7 +416,6 @@ func restoreCore() (err error) {
cs.partitionSizeLimit = csExport.PartitionSizeLimit
cs.truncatedTimestamp = csExport.TruncatedTimestamp
cs.removedOffsetsCounter = csExport.RemovedOffsetsCounter
cs.metaOffsetsLength = csExport.MetaOffsetsLength
cs.macros = csExport.Macros
cs.insertionFilter = csExport.InsertionFilter
cs.insertionFilterExpr, _, _ = prepareQuery(cs.insertionFilter)
Expand Down Expand Up @@ -456,23 +450,23 @@ func getLastTimestampOfPartition(discardedPartitionIndex int64) (timestamp int64
partitionRefs := cs.partitionRefs
cs.RUnlock()

var removedOffsetsCounter uint64
var prevIndex int
var removedOffsetsCounter int
for i := range offsets {
if partitionRefs[i] > discardedPartitionIndex {
break
}
prevIndex = i
removedOffsetsCounter++
}

cs.Lock()
cs.offsets = cs.offsets[removedOffsetsCounter:]
cs.partitionRefs = cs.partitionRefs[removedOffsetsCounter:]
cs.removedOffsetsCounter += removedOffsetsCounter
cs.removedOffsetsCounter = removedOffsetsCounter
cs.Unlock()

var n int64
var f *os.File
n, f, err = getOffsetAndPartition(0)
n, f, err = getOffsetAndPartition(prevIndex)

if err != nil {
return
Expand Down Expand Up @@ -770,7 +764,7 @@ func insertData(data []byte) {
var lastOffset int64
// Safely access the last offset and current partition.
cs.Lock()
l := cs.metaOffsetsLength
l := len(cs.offsets)
lastOffset = cs.lastOffset
f := cs.partitions[cs.partitionIndex]

Expand All @@ -789,7 +783,6 @@ func insertData(data []byte) {
cs.offsets = append(cs.offsets, lastOffset)
cs.partitionRefs = append(cs.partitionRefs, cs.partitionIndex)
cs.lastOffset = lastOffset + 8 + length
cs.metaOffsetsLength++

// Release the lock
cs.Unlock()
Expand Down Expand Up @@ -935,7 +928,7 @@ func handleNegativeLeftOff(leftOff int64) int64 {
// If leftOff value is -1 then set it to last offset
if leftOff < 0 {
cs.RLock()
lastOffset := cs.metaOffsetsLength - 1
lastOffset := len(cs.offsets) - 1
cs.RUnlock()
leftOff = int64(lastOffset)
if leftOff < 0 {
Expand Down Expand Up @@ -977,6 +970,9 @@ func streamRecords(conn net.Conn, data []byte) (err error) {
// Number of queried records
var queried uint64 = 0

// removedCounter keeps track of how many offsets belong to a removed partition.
var removedOffsetsCounter int

for {
// f is the current partition we're reading the data from.
var f *os.File
Expand All @@ -992,14 +988,11 @@ func streamRecords(conn net.Conn, data []byte) (err error) {

// Safely access the next part of offsets and partition references.
cs.RLock()
xLeftOff := leftOff - int64(cs.removedOffsetsCounter)
if xLeftOff < 0 || int(xLeftOff) > len(cs.offsets) {
xLeftOff = 0
}
subOffsets := cs.offsets[xLeftOff:]
subPartitionRefs := cs.partitionRefs[xLeftOff:]
totalNumberOfRecords := cs.metaOffsetsLength - cs.removedOffsetsCounter
subOffsets := cs.offsets[leftOff:]
subPartitionRefs := cs.partitionRefs[leftOff:]
totalNumberOfRecords := len(cs.offsets)
truncatedTimestamp := cs.truncatedTimestamp
removedOffsetsCounter = cs.removedOffsetsCounter
cs.RUnlock()

// Disable rlimit if it's bigger than the total records.
Expand All @@ -1019,7 +1012,7 @@ func streamRecords(conn net.Conn, data []byte) (err error) {
cs.RLock()
partitionRef = subPartitionRefs[i]
fRef := cs.partitions[partitionRef]
totalNumberOfRecords = cs.metaOffsetsLength
totalNumberOfRecords = len(cs.offsets)
truncatedTimestamp = cs.truncatedTimestamp
cs.RUnlock()

Expand Down Expand Up @@ -1077,10 +1070,13 @@ func streamRecords(conn net.Conn, data []byte) (err error) {
}
}

// Correct the metadata values by subtracting removedOffsetsCounter
realTotal := totalNumberOfRecords - removedOffsetsCounter

metadata = &Metadata{
NumberOfWritten: numberOfWritten,
Current: uint64(queried),
Total: totalNumberOfRecords,
Total: uint64(realTotal),
LeftOff: uint64(leftOff),
TruncatedTimestamp: truncatedTimestamp,
}
Expand Down Expand Up @@ -1137,22 +1133,17 @@ func retrieveSingle(conn net.Conn, args []string) (err error) {

// Safely access the length of offsets slice.
cs.RLock()
l := cs.metaOffsetsLength
removedOffsetsCounter := cs.removedOffsetsCounter
l := len(cs.offsets)
cs.RUnlock()

// Check if the index is in the offsets slice.
if uint64(index) > l {
if index > l {
conn.Write([]byte(fmt.Sprintf("Index out of range: %d\n", index)))
return
}

// Safely acces the offsets and partition references
xLeftOff := index - int(removedOffsetsCounter)
if xLeftOff < 0 {
xLeftOff = 0
}
n, f, err := getOffsetAndPartition(xLeftOff)
n, f, err := getOffsetAndPartition(index)

// Record can only be removed if the partition of the record
// that it belongs to is removed. Therefore a file open error
Expand Down Expand Up @@ -1217,11 +1208,11 @@ func fetch(conn net.Conn, args []string) {

// Safely access the length of offsets slice.
cs.RLock()
l := cs.metaOffsetsLength
l := len(cs.offsets)
cs.RUnlock()

// Check if the leftOff is in the offsets slice.
if uint64(leftOff) > l {
if int(leftOff) > l {
conn.Write([]byte(fmt.Sprintf("Index out of range: %d\n", leftOff)))
return
}
Expand Down Expand Up @@ -1250,22 +1241,20 @@ func fetch(conn net.Conn, args []string) {
// Safely access the next part of offsets and partition references.
var subOffsets []int64
var subPartitionRefs []int64
var totalNumberOfRecords uint64
var totalNumberOfRecords int
var truncatedTimestamp int64
var removedOffsetsCounter int
cs.RLock()
totalNumberOfRecords = cs.metaOffsetsLength
totalNumberOfRecords = len(cs.offsets)
truncatedTimestamp = cs.truncatedTimestamp
xLeftOff := leftOff - int64(cs.removedOffsetsCounter)
if xLeftOff < 0 {
xLeftOff = 0
}
if direction < 0 {
subOffsets = cs.offsets[:xLeftOff]
subPartitionRefs = cs.partitionRefs[:xLeftOff]
subOffsets = cs.offsets[:leftOff]
subPartitionRefs = cs.partitionRefs[:leftOff]
} else {
subOffsets = cs.offsets[xLeftOff:]
subPartitionRefs = cs.partitionRefs[xLeftOff:]
subOffsets = cs.offsets[leftOff:]
subPartitionRefs = cs.partitionRefs[leftOff:]
}
removedOffsetsCounter = cs.removedOffsetsCounter
cs.RUnlock()

var metadata []byte
Expand All @@ -1276,7 +1265,7 @@ func fetch(conn net.Conn, args []string) {
metadata, _ = json.Marshal(Metadata{
NumberOfWritten: numberOfWritten,
Current: uint64(queried),
Total: totalNumberOfRecords,
Total: uint64(totalNumberOfRecords - removedOffsetsCounter),
LeftOff: uint64(leftOff),
TruncatedTimestamp: truncatedTimestamp,
})
Expand Down Expand Up @@ -1353,7 +1342,7 @@ func fetch(conn net.Conn, args []string) {
metadata, _ = json.Marshal(Metadata{
NumberOfWritten: numberOfWritten,
Current: uint64(queried),
Total: uint64(totalNumberOfRecords),
Total: uint64(totalNumberOfRecords - removedOffsetsCounter),
LeftOff: uint64(leftOff),
TruncatedTimestamp: truncatedTimestamp,
})
Expand Down
2 changes: 0 additions & 2 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,6 @@ func TestServerFlush(t *testing.T) {
assert.Empty(t, cs.partitionSizeLimit)
assert.Empty(t, cs.truncatedTimestamp)
assert.Empty(t, cs.removedOffsetsCounter)
assert.Empty(t, cs.metaOffsetsLength)
assert.Equal(t, cs.macros, macros)
assert.Equal(t, cs.insertionFilter, insertionFilter)
assert.Equal(t, cs.insertionFilterExpr, insertionFilterExpr)
Expand Down Expand Up @@ -798,7 +797,6 @@ func TestServerReset(t *testing.T) {
assert.Empty(t, cs.partitionSizeLimit)
assert.Empty(t, cs.truncatedTimestamp)
assert.Empty(t, cs.removedOffsetsCounter)
assert.Empty(t, cs.metaOffsetsLength)
assert.Empty(t, cs.macros)
assert.Empty(t, cs.insertionFilter)
assert.Empty(t, cs.insertionFilterExpr)
Expand Down

0 comments on commit 785f306

Please sign in to comment.