Fix resumable binlog position after interrupt#168
Conversation
2f562cb to
b3cd656
Compare
hkdsun
left a comment
There was a problem hiding this comment.
Love how simply you solved this ❤️ couple comments and then good-to-go I think
And excellent test case by @kolbitsch-lastline, thank you for the contribution!
| } | ||
|
|
||
| dmlEvs, err := NewBinlogDMLEvents(table, ev, pos) | ||
| s.lastResumableBinlogPosition.Name = s.lastStreamedBinlogPosition.Name |
There was a problem hiding this comment.
Wait, what? I don't think this should be legal 😄 jokes aside, mutating any binlog position sounds like a recipe for bugs down the line.. it's a bit like randomizing the coordinates you put in a GPS.. you might end up in middle of nowhere!
Would a test fail if I removed this line?
There was a problem hiding this comment.
I wouldn't recommend removing it 😄.
Instead of just duplicating all the places where lastStreamedBinlogPosition is set to also set this value, we just reuse the value like is already being done for the pos value in line 262.
Do you think it makes more sense to put it closer to the pos assignment? I don't think it would make much difference.
There was a problem hiding this comment.
Oh I see what's going on. This could use some cleanup, I'll PR something based off your branch.
There was a problem hiding this comment.
Ended up moving this assignment to the RotateEvent and in the initial connect further up. That allows us to remove it from this line here.
| // so there's no way to handle this for us. | ||
| continue | ||
| case *replication.XIDEvent, *replication.GTIDEvent: | ||
| s.lastResumableBinlogPosition.Pos = uint32(ev.Header.LogPos) |
There was a problem hiding this comment.
Let's document in code comments what got us to this? 🙏
2bb8be2 to
2df4712
Compare
2df4712 to
38f9c93
Compare
hkdsun
left a comment
There was a problem hiding this comment.
LGTM. Let's get Shuhao's +1 and 🚢
| // | ||
| // As a result, the following case will set the last resumable position for | ||
| // interruption to EITHER the start (if using GTIDs) or the end of the | ||
| // last transaction |
There was a problem hiding this comment.
Succinct explanation. Thank you!
| binlogStreamer *replication.BinlogStreamer | ||
| lastStreamedBinlogPosition mysql.Position | ||
| lastResumableBinlogPosition mysql.Position | ||
| targetBinlogPosition mysql.Position |
There was a problem hiding this comment.
We don't have to do this now, but this was brought up the other day: Since we will eventually start streaming binlog from the target for verification, maybe we should rename this to "stopAtBinlogPosition", or something to that effect.
I just wanted to mention this so we don't forget about it.
|
|
||
| ghostferry.on_status(Ghostferry::Status::AFTER_BINLOG_APPLY) do | ||
| # while we are emitting events in the loop above, try to inject a shutdown | ||
| # signal, hoping to interrupt between applying an INSERT and receiving the |
There was a problem hiding this comment.
So this test will be known to racy? Will it make CI more flaky?
There was a problem hiding this comment.
It has been consistently passing in my experience and hasn’t been flaky
|
|
||
| logrus.SetLevel(logrus.DebugLevel) | ||
| if os.Getenv("CI") == "true" { | ||
| logrus.SetLevel(logrus.ErrorLevel) |
There was a problem hiding this comment.
Can you remind me what this is for? I don't recall anymore.
There was a problem hiding this comment.
The log output from the binlog events is seemingly causing CI to truncate the output. This just lowers the logging level unless needed for the tests
| case *replication.RotateEvent: | ||
| // This event is needed because we need to update the last successful | ||
| // binlog position. | ||
| s.lastResumableBinlogPosition.Pos = uint32(e.Position) |
There was a problem hiding this comment.
Shouldn't we need to set the .Name here as well? This is a RotateEvent so the file should change, right? That would get rid of the slightly cryptic s.lastResumableBinlogPosition.Name line below?
There was a problem hiding this comment.
Following up on our conversation, we decided to set the name here and in the ConnectBinlogStreamerToMysqlFrom above 👍
c2a79fd to
d2dc8cc
Compare
| // This event is needed because we need to update the last successful | ||
| // binlog position. | ||
| s.lastResumableBinlogPosition.Pos = uint32(e.Position) | ||
| s.lastResumableBinlogPosition.Name = string(e.NextLogName) |
There was a problem hiding this comment.
Wait, I thought we agreed we can't resume from a RotateEvent's position?
There was a problem hiding this comment.
This is the only place where MySQL replication gives us the file name of the binlog. This value is therefore cached until the next Rotate event, which would give us the next file name. So we have to set this filename here?
There was a problem hiding this comment.
well, why can't we use a separate currentFilename variable to cache the current filename?
the current filename needs a clear separation from the resumable position.
Alternative to #160 (and also makes use of the same added test with slight modifications).
As described in the other PR, attempting to resume Ghostferry from an invalid binlog position will result in an error as the preceding
TableMapEventis required to properly translateRowsEvents.This adds a resumable position and interface to the
DMLEventthat theInlineVerifierandBinlogWriterpersist during interruption to be reused during resume.The last resumable position is stored as a member of the
BinlogStreamerand is periodically updated as theBinlogStreamersees new resumable events (GTIDEventsandXIDEvents). This position is then sent along with the correspondingDMLEvent. The resumable position is made available on theDMLEventsusing a new interface method -ResumableBinlogPosition./cc @Shopify/pods