Skip to content

Commit

Permalink
new delete state to track keys while writing
Browse files Browse the repository at this point in the history
  • Loading branch information
gregjhogan committed Sep 28, 2023
1 parent 451d248 commit 06e6371
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 5 deletions.
10 changes: 8 additions & 2 deletions src/lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const (
NO Deleted = 0
SOFT Deleted = 1
HARD Deleted = 2
INIT Deleted = 3 // data not written yet
)

type Record struct {
Expand All @@ -34,7 +35,10 @@ func toRecord(data []byte) Record {
var rec Record
ss := string(data)
rec.deleted = NO
if strings.HasPrefix(ss, "DELETED") {
if strings.HasPrefix(ss, "INIT") {
rec.deleted = INIT
ss = ss[3:]
} else if strings.HasPrefix(ss, "DELETED") {
rec.deleted = SOFT
ss = ss[7:]
}
Expand All @@ -51,7 +55,9 @@ func fromRecord(rec Record) []byte {
if rec.deleted == HARD {
panic("Can't put HARD delete in the database")
}
if rec.deleted == SOFT {
if rec.deleted == INIT {
cc = "INIT"
} else if rec.deleted == SOFT {
cc = "DELETED"
}
if len(rec.hash) == 32 {
Expand Down
9 changes: 6 additions & 3 deletions src/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (a *App) QueryHandler(key []byte, w http.ResponseWriter, r *http.Request) {
// operation is first query parameter (e.g. ?list&limit=10)
operation := strings.Split(r.URL.RawQuery, "&")[0]
switch operation {
case "list", "unlinked":
case "list", "writing", "unlinked":
start := r.URL.Query().Get("start")
limit := 0
qlimit := r.URL.Query().Get("limit")
Expand All @@ -73,6 +73,7 @@ func (a *App) QueryHandler(key []byte, w http.ResponseWriter, r *http.Request) {
for iter.Next() {
rec := toRecord(iter.Value())
if (rec.deleted != NO && operation == "list") ||
(rec.deleted != INIT && operation == "writing") ||
(rec.deleted != SOFT && operation == "unlinked") {
continue
}
Expand Down Expand Up @@ -146,7 +147,7 @@ func (a *App) WriteToReplicas(key []byte, value io.Reader, valuelen int64) int {
kvolumes := key2volume(key, a.volumes, a.replicas, a.subvolumes)

// push to leveldb initially as deleted, and without a hash since we don't have it yet
if !a.PutRecord(key, Record{kvolumes, SOFT, ""}) {
if !a.PutRecord(key, Record{kvolumes, INIT, ""}) {
return 500
}

Expand All @@ -162,6 +163,8 @@ func (a *App) WriteToReplicas(key []byte, value io.Reader, valuelen int64) int {
if remote_put(remote, valuelen, body) != nil {
// we assume the remote wrote nothing if it failed
fmt.Printf("replica %d write failed: %s\n", i, remote)
// try not to leave key in INIT (writing) state
a.PutRecord(key, Record{kvolumes, SOFT, hash})
return 500
}
}
Expand Down Expand Up @@ -212,7 +215,7 @@ func (a *App) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// note that the hash is always of the whole file, not the content requested
w.Header().Set("Content-Md5", rec.hash)
}
if rec.deleted == SOFT || rec.deleted == HARD {
if rec.deleted != NO {
if a.fallback == "" {
w.Header().Set("Content-Length", "0")
w.WriteHeader(404)
Expand Down

0 comments on commit 06e6371

Please sign in to comment.