Skip to content

Commit 15e9087

Browse files
committed
Time travel pragma.
1 parent 7028e3a commit 15e9087

4 files changed

Lines changed: 128 additions & 39 deletions

File tree

embed/bcw2/build.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ if [[ "$OSTYPE" == "msys" || "$OSTYPE" == "cygwin" ]]; then
2323
MSYS_NO_PATHCONV=1 nmake /f makefile.msc sqlite3.c "OPTS=-DSQLITE_ENABLE_UPDATE_DELETE_LIMIT -DSQLITE_ENABLE_ORDERED_SET_AGGREGATES"
2424
else
2525
sh configure --enable-update-limit
26+
make verify-source
2627
OPTS=-DSQLITE_ENABLE_ORDERED_SET_AGGREGATES make sqlite3.c
2728
fi
2829
cd ~-

litestream/vfs.go

Lines changed: 126 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"fmt"
88
"io"
99
"strconv"
10+
"strings"
1011
"sync"
1112
"time"
1213

@@ -62,19 +63,22 @@ type liteFile struct {
6263
db *liteDB
6364
conn *sqlite3.Conn
6465
pages *pageIndex
66+
syncTime time.Time
6567
txid ltx.TXID
6668
pageSize uint32
69+
locked bool
6770
}
6871

6972
func (f *liteFile) Close() error { return nil }
7073

7174
func (f *liteFile) ReadAt(p []byte, off int64) (n int, err error) {
7275
ctx := f.context()
7376
pages, txid := f.pages, f.txid
74-
if pages == nil {
77+
if pages == nil && f.syncTime.IsZero() {
7578
pages, txid, err = f.db.pollReplica(ctx)
7679
}
7780
if err != nil {
81+
f.db.opts.Logger.Error("poll replica", "error", err)
7882
return 0, err
7983
}
8084

@@ -135,14 +139,25 @@ func (f *liteFile) Size() (size int64, err error) {
135139

136140
func (f *liteFile) Lock(lock vfs.LockLevel) (err error) {
137141
if lock >= vfs.LOCK_RESERVED {
142+
// notest // OPEN_READONLY
138143
return sqlite3.IOERR_LOCK
139144
}
140-
f.pages, f.txid, err = f.db.pollReplica(f.context())
145+
if f.syncTime.IsZero() {
146+
f.pages, f.txid, err = f.db.pollReplica(f.context())
147+
}
148+
if err != nil {
149+
f.db.opts.Logger.Error("poll replica", "error", err)
150+
} else {
151+
f.locked = true
152+
}
141153
return err
142154
}
143155

144156
func (f *liteFile) Unlock(lock vfs.LockLevel) error {
145-
f.pages, f.txid = nil, 0
157+
if f.syncTime.IsZero() {
158+
f.pages, f.txid = nil, 0
159+
}
160+
f.locked = false
146161
return nil
147162
}
148163

@@ -166,7 +181,6 @@ func (f *liteFile) Pragma(name, value string) (string, error) {
166181
case "litestream_txid":
167182
txid := f.txid
168183
if txid == 0 {
169-
// Outside transaction.
170184
f.db.mtx.Lock()
171185
txid = f.db.txids[0]
172186
f.db.mtx.Unlock()
@@ -179,11 +193,45 @@ func (f *liteFile) Pragma(name, value string) (string, error) {
179193
f.db.mtx.Unlock()
180194

181195
if lastPoll.IsZero() {
182-
// Never polled successfully.
183196
return "-1", nil
184197
}
185198
lag := time.Since(lastPoll) / time.Second
186199
return strconv.FormatInt(int64(lag), 10), nil
200+
201+
case "litestream_time":
202+
if value == "" {
203+
syncTime := f.syncTime
204+
if syncTime.IsZero() {
205+
f.db.mtx.Lock()
206+
syncTime = f.db.lastInfo
207+
f.db.mtx.Unlock()
208+
}
209+
if syncTime.IsZero() {
210+
return "latest", nil
211+
}
212+
return syncTime.Format(time.RFC3339Nano), nil
213+
}
214+
215+
if !f.locked {
216+
return "", sqlite3.MISUSE
217+
}
218+
219+
if strings.EqualFold(value, "latest") {
220+
f.syncTime = time.Time{}
221+
f.pages, f.txid = nil, 0
222+
return "", nil
223+
}
224+
225+
syncTime, err := sqlite3.TimeFormatAuto.Decode(value)
226+
if err != nil {
227+
return "", err
228+
}
229+
230+
err = f.buildIndex(f.context(), syncTime)
231+
if err != nil {
232+
f.db.opts.Logger.Error("build index", "error", err)
233+
}
234+
return "", err
187235
}
188236

189237
return "", sqlite3.NOTFOUND
@@ -200,27 +248,53 @@ func (f *liteFile) context() context.Context {
200248
return context.Background()
201249
}
202250

251+
func (f *liteFile) buildIndex(ctx context.Context, syncTime time.Time) error {
252+
// Build the index from scratch from a Litestream restore plan.
253+
infos, err := litestream.CalcRestorePlan(ctx, f.db.client, 0, syncTime, f.db.opts.Logger)
254+
if err != nil {
255+
if !errors.Is(err, litestream.ErrTxNotAvailable) {
256+
return fmt.Errorf("calc restore plan: %w", err)
257+
}
258+
return nil
259+
}
260+
261+
var txid ltx.TXID
262+
var pages *pageIndex
263+
for _, info := range infos {
264+
pages, err = fetchPageIndex(ctx, pages, f.db.client, info)
265+
if err != nil {
266+
return err
267+
}
268+
txid = max(txid, info.MaxTXID)
269+
}
270+
f.syncTime = syncTime
271+
f.pages = pages
272+
f.txid = txid
273+
return nil
274+
}
275+
203276
type liteDB struct {
204277
client litestream.ReplicaClient
205278
opts ReplicaOptions
206279
cache pageCache
207280
pages *pageIndex // +checklocks:mtx
208281
lastPoll time.Time // +checklocks:mtx
282+
lastInfo time.Time // +checklocks:mtx
209283
txids levelTXIDs // +checklocks:mtx
210284
mtx sync.Mutex
211285
}
212286

213-
func (f *liteDB) buildIndex(ctx context.Context) error {
214-
f.mtx.Lock()
215-
defer f.mtx.Unlock()
287+
func (d *liteDB) buildIndex(ctx context.Context) error {
288+
d.mtx.Lock()
289+
defer d.mtx.Unlock()
216290

217291
// Skip if we already have an index.
218-
if f.pages != nil {
292+
if d.pages != nil {
219293
return nil
220294
}
221295

222296
// Build the index from scratch from a Litestream restore plan.
223-
infos, err := litestream.CalcRestorePlan(ctx, f.client, 0, time.Time{}, f.opts.Logger)
297+
infos, err := litestream.CalcRestorePlan(ctx, d.client, 0, time.Time{}, d.opts.Logger)
224298
if err != nil {
225299
if !errors.Is(err, litestream.ErrTxNotAvailable) {
226300
return fmt.Errorf("calc restore plan: %w", err)
@@ -229,47 +303,46 @@ func (f *liteDB) buildIndex(ctx context.Context) error {
229303
}
230304

231305
for _, info := range infos {
232-
err := f.updateInfo(ctx, info)
306+
err := d.updateInfo(ctx, info)
233307
if err != nil {
234308
return err
235309
}
236310
}
237311

238-
f.lastPoll = time.Now()
312+
d.lastPoll = time.Now()
239313
return nil
240314
}
241315

242-
func (f *liteDB) pollReplica(ctx context.Context) (*pageIndex, ltx.TXID, error) {
243-
f.mtx.Lock()
244-
defer f.mtx.Unlock()
316+
func (d *liteDB) pollReplica(ctx context.Context) (*pageIndex, ltx.TXID, error) {
317+
d.mtx.Lock()
318+
defer d.mtx.Unlock()
245319

246320
// Limit polling interval.
247-
if time.Since(f.lastPoll) < f.opts.PollInterval {
248-
return f.pages, f.txids[0], nil
321+
if time.Since(d.lastPoll) < d.opts.PollInterval {
322+
return d.pages, d.txids[0], nil
249323
}
250324

251325
for level := range []int{0, 1, litestream.SnapshotLevel} {
252-
if err := f.updateLevel(ctx, level); err != nil {
253-
f.opts.Logger.Error("cannot poll replica", "error", err)
326+
if err := d.updateLevel(ctx, level); err != nil {
254327
return nil, 0, err
255328
}
256329
}
257330

258-
f.lastPoll = time.Now()
259-
return f.pages, f.txids[0], nil
331+
d.lastPoll = time.Now()
332+
return d.pages, d.txids[0], nil
260333
}
261334

262-
// +checklocks:f.mtx
263-
func (f *liteDB) updateLevel(ctx context.Context, level int) error {
335+
// +checklocks:d.mtx
336+
func (d *liteDB) updateLevel(ctx context.Context, level int) error {
264337
var nextTXID ltx.TXID
265338
// Snapshots must start from scratch,
266339
// other levels can start from where they were left.
267340
if level != litestream.SnapshotLevel {
268-
nextTXID = f.txids[level] + 1
341+
nextTXID = d.txids[level] + 1
269342
}
270343

271344
// Start reading from the next LTX file after the current position.
272-
itr, err := f.client.LTXFiles(ctx, level, nextTXID, false)
345+
itr, err := d.client.LTXFiles(ctx, level, nextTXID, false)
273346
if err != nil {
274347
return fmt.Errorf("ltx files: %w", err)
275348
}
@@ -280,11 +353,11 @@ func (f *liteDB) updateLevel(ctx context.Context, level int) error {
280353
info := itr.Item()
281354

282355
// Skip LTX files already fully loaded into the index.
283-
if info.MaxTXID <= f.txids[level] {
356+
if info.MaxTXID <= d.txids[level] {
284357
continue
285358
}
286359

287-
err := f.updateInfo(ctx, info)
360+
err := d.updateInfo(ctx, info)
288361
if err != nil {
289362
return err
290363
}
@@ -295,26 +368,41 @@ func (f *liteDB) updateLevel(ctx context.Context, level int) error {
295368
return itr.Close()
296369
}
297370

298-
// +checklocks:f.mtx
299-
func (f *liteDB) updateInfo(ctx context.Context, info *ltx.FileInfo) error {
300-
idx, err := litestream.FetchPageIndex(ctx, f.client, info)
371+
// +checklocks:d.mtx
372+
func (d *liteDB) updateInfo(ctx context.Context, info *ltx.FileInfo) error {
373+
pages, err := fetchPageIndex(ctx, d.pages, d.client, info)
301374
if err != nil {
302-
return fmt.Errorf("fetch page index: %w", err)
375+
return err
376+
}
377+
378+
// Track the MaxTXID for each level.
379+
maxTXID := &d.txids[info.Level]
380+
*maxTXID = max(*maxTXID, info.MaxTXID)
381+
d.txids[0] = max(d.txids[0], *maxTXID)
382+
if d.lastInfo.Before(info.CreatedAt) {
383+
d.lastInfo = info.CreatedAt
384+
}
385+
d.pages = pages
386+
return nil
387+
}
388+
389+
func fetchPageIndex(
390+
ctx context.Context, pages *pageIndex,
391+
client litestream.ReplicaClient, info *ltx.FileInfo) (*pageIndex, error) {
392+
393+
idx, err := litestream.FetchPageIndex(ctx, client, info)
394+
if err != nil {
395+
return nil, fmt.Errorf("fetch page index: %w", err)
303396
}
304397

305398
// Replace pages in the index with new pages.
306399
for k, v := range idx {
307400
// Patch avoids mutating the index for an unmodified page.
308-
f.pages = f.pages.Patch(k, func(node *pageIndex) (ltx.PageIndexElem, bool) {
401+
pages = pages.Patch(k, func(node *pageIndex) (ltx.PageIndexElem, bool) {
309402
return v, node == nil || v != node.Value()
310403
})
311404
}
312-
313-
// Track the MaxTXID for each level.
314-
maxTXID := &f.txids[info.Level]
315-
*maxTXID = max(*maxTXID, info.MaxTXID)
316-
f.txids[0] = max(f.txids[0], *maxTXID)
317-
return nil
405+
return pages, nil
318406
}
319407

320408
// Type aliases; these are a mouthful.

vfs/vfs.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -329,9 +329,9 @@ func vfsFileControlImpl(ctx context.Context, mod api.Module, file File, op _Fcnt
329329

330330
case _FCNTL_PRAGMA:
331331
if file, ok := file.(FilePragma); ok {
332+
var value string
332333
ptr := util.Read32[ptr_t](mod, pArg+1*ptrlen)
333334
name := util.ReadString(mod, ptr, _MAX_SQL_LENGTH)
334-
var value string
335335
if ptr := util.Read32[ptr_t](mod, pArg+2*ptrlen); ptr != 0 {
336336
value = util.ReadString(mod, ptr, _MAX_SQL_LENGTH)
337337
}

0 commit comments

Comments
 (0)