Skip to content

Commit

Permalink
global rebalance vs out-of-band updates (major)
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Dec 10, 2024
1 parent 2ac6b70 commit 312a648
Showing 1 changed file with 48 additions and 0 deletions.
48 changes: 48 additions & 0 deletions reb/recv.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,55 @@ func (reb *Reb) recvObjRegular(hdr *transport.ObjHdr, smap *meta.Smap, unpacker
nlog.Errorf("%s g[%d]: early receive from %s %s (stage %s)", core.T, reb.RebID(), meta.Tname(tsid), lom, stages[stage])
}

//
// when destination exists
//
if lom.Load(false, false) == nil {
if lom.CheckEq(&hdr.ObjAttrs) == nil {
// no-op: optimize-out duplicated write
cos.DrainReader(objReader)
return reb.regACK(smap, hdr, tsid)
}
if lom.Bck().IsRemote() {
oa, ecode, err := core.T.HeadCold(lom, nil)
if err == nil {
// receive latest version from tsid
if oa.CheckEq(&hdr.ObjAttrs) == nil {
goto rx // go ahead to overwrite this lom
}
}
if cos.IsNotExist(err, ecode) && lom.VersionConf().Sync {
// try to delete in place (TODO: compare with lom.CheckRemoteMD; unify)
locked := lom.TryLock(true)
errDel := lom.RemoveObj(true)
if locked {
lom.Unlock(true)
}
if errDel != nil {
nlog.Errorf("%s g[%d]: failed to sync-delete %s: %v", core.T, reb.RebID(), lom, errDel)
} else if cmn.Rom.FastV(5, cos.SmoduleReb) {
nlog.Infof("%s g[%d]: sync-deleted %s", core.T, reb.RebID(), lom)
}
}
}

// cannot choose between the source and the destination
if cmn.Rom.FastV(5, cos.SmoduleReb) {
nlog.Warningf("%s g[%d]: recv ambiguity (%s, %s) vs (%s) - dropping, discarding", core.T, reb.RebID(),
lom, lom.ObjAttrs().String(), hdr.ObjAttrs.String())
}
cos.DrainReader(objReader)
return reb.regACK(smap, hdr, tsid)
}

rx:
lom.CopyAttrs(&hdr.ObjAttrs, true /*skip-checksum*/) // see "PUT is a no-op"

xreb := reb.xctn()
if xreb.IsAborted() {
return nil
}

params := core.AllocPutParams()
{
params.WorkTag = fs.WorkfilePut
Expand All @@ -194,6 +238,10 @@ func (reb *Reb) recvObjRegular(hdr *transport.ObjHdr, smap *meta.Smap, unpacker
xreb.InObjsAdd(1, hdr.ObjAttrs.Size)

// ACK
return reb.regACK(smap, hdr, tsid)
}

func (reb *Reb) regACK(smap *meta.Smap, hdr *transport.ObjHdr, tsid string) error {
tsi := smap.GetTarget(tsid)
if tsi == nil {
err := fmt.Errorf("g[%d]: %s is not in the %s", reb.RebID(), meta.Tname(tsid), smap)
Expand Down

0 comments on commit 312a648

Please sign in to comment.