func (r *Replica) stageRaftCommand( ctx context.Context, cmd *cmdAppCtx, batch engine.Batch, replicaState *storagepb.ReplicaState, writeAppliedState bool, ) { if cmd.e.Index == 0 { log.Fatalf(ctx, "processRaftCommand requires a non-zero index") } if log.V(4) { log.Infof(ctx, "processing command %x: maxLeaseIndex=%d", cmd.idKey, cmd.raftCmd.MaxLeaseIndex) }
var ts hlc.Timestamp if cmd.idKey != "" { ts = cmd.replicatedResult().Timestamp }
cmd.leaseIndex, cmd.proposalRetry, cmd.forcedErr = checkForcedErr(ctx, cmd.idKey, cmd.raftCmd, cmd.proposal, cmd.proposedLocally(), replicaState)
if cmd.forcedErr != nil { log.VEventf(ctx, 1, "applying command with forced error: %s", cmd.forcedErr) } else { log.Event(ctx, "applying command")
if splitMergeUnlock, err := r.maybeAcquireSplitMergeLock(ctx, cmd.raftCmd); err != nil { log.Eventf(ctx, "unable to acquire split lock: %s", err) _ = r.store.stopper.RunAsyncTask(ctx, "crash report", func(ctx context.Context) { log.SendCrashReport( ctx, &r.store.cfg.Settings.SV, 0, "while acquiring split lock: %s", []interface{}{err}, log.ReportTypeError, ) })
cmd.forcedErr = roachpb.NewError(err) } else if splitMergeUnlock != nil { cmd.splitMergeUnlock = splitMergeUnlock } }
if filter := r.store.cfg.TestingKnobs.TestingApplyFilter; cmd.forcedErr == nil && filter != nil { var newPropRetry int newPropRetry, cmd.forcedErr = filter(storagebase.ApplyFilterArgs{ CmdID: cmd.idKey, ReplicatedEvalResult: *cmd.replicatedResult(), StoreID: r.store.StoreID(), RangeID: r.RangeID, }) if cmd.proposalRetry == 0 { cmd.proposalRetry = proposalReevaluationReason(newPropRetry) } }
if cmd.forcedErr != nil { *cmd.replicatedResult() = storagepb.ReplicatedEvalResult{} cmd.raftCmd.WriteBatch = nil cmd.raftCmd.LogicalOpLog = nil }
if err := r.runPreApplyTriggersBeforeStagingWriteBatch(ctx, cmd, batch); err != nil { log.Errorf(ctx, "unable to update the state machine: %+v", err) log.Fatal(ctx, err) } r.store.Clock().Update(ts)
if deprecatedDelta := cmd.replicatedResult().DeprecatedDelta; deprecatedDelta != nil { if cmd.replicatedResult().Delta != (enginepb.MVCCStatsDelta{}) { log.Fatalf(ctx, "stats delta not empty but deprecated delta provided: %+v", cmd) } cmd.replicatedResult().Delta = deprecatedDelta.ToStatsDelta() cmd.replicatedResult().DeprecatedDelta = nil }
err := r.applyRaftCommandToBatch(cmd.ctx, cmd, replicaState, batch, writeAppliedState) if err != nil { log.Errorf(ctx, "unable to update the state machine: %+v", err) log.Fatal(ctx, err) }
if cmd.replicatedResult().AddSSTable != nil { copied := addSSTablePreApply( ctx, r.store.cfg.Settings, r.store.engine, r.raftMu.sideloaded, cmd.e.Term, cmd.e.Index, *cmd.replicatedResult().AddSSTable, r.store.limiters.BulkIOWriteRate, ) r.store.metrics.AddSSTableApplications.Inc(1) if copied { r.store.metrics.AddSSTableApplicationCopies.Inc(1) } cmd.replicatedResult().AddSSTable = nil }
if cmd.replicatedResult().Split != nil { splitPreApply(ctx, batch, cmd.replicatedResult().Split.SplitTrigger) }
if merge := cmd.replicatedResult().Merge; merge != nil { rhsRepl, err := r.store.GetReplica(merge.RightDesc.RangeID) if err != nil { log.Fatal(ctx, err) } const destroyData = false err = rhsRepl.preDestroyRaftMuLocked(ctx, batch, batch, merge.RightDesc.NextReplicaID, destroyData) if err != nil { log.Fatal(ctx, err) } }
if cmd.raftCmd.WriteBatch != nil { r.handleLogicalOpLogRaftMuLocked(ctx, cmd.raftCmd.LogicalOpLog, batch) } else if cmd.raftCmd.LogicalOpLog != nil { log.Fatalf(ctx, "non-nil logical op log with nil write batch: %v", cmd.raftCmd) } }
func (r *Replica) runPreApplyTriggersBeforeStagingWriteBatch( ctx context.Context, cmd *cmdAppCtx, batch engine.Batch, ) error { if ops := cmd.raftCmd.LogicalOpLog; ops != nil { r.populatePrevValsInLogicalOpLogRaftMuLocked(ctx, ops, batch) } return nil }
func (r *Replica) populatePrevValsInLogicalOpLogRaftMuLocked( ctx context.Context, ops *storagepb.LogicalOpLog, prevReader engine.Reader, ) { if r.raftMu.rangefeed == nil { return } if ops == nil { return }
for _, op := range ops.Ops { var key []byte var ts hlc.Timestamp var prevValPtr *[]byte switch t := op.GetValue().(type) { case *enginepb.MVCCWriteValueOp: key, ts, prevValPtr = t.Key, t.Timestamp, &t.PrevValue case *enginepb.MVCCCommitIntentOp: key, ts, prevValPtr = t.Key, t.Timestamp, &t.PrevValue case *enginepb.MVCCWriteIntentOp, *enginepb.MVCCUpdateIntentOp, *enginepb.MVCCAbortIntentOp, *enginepb.MVCCAbortTxnOp: continue default: panic(fmt.Sprintf("unknown logical op %T", t)) }
prevVal, _, err := engine.MVCCGet( ctx, prevReader, key, ts, engine.MVCCGetOptions{Tombstones: true, Inconsistent: true}, ) if err != nil { r.disconnectRangefeedWithErrRaftMuLocked(roachpb.NewErrorf( "error consuming %T for key %v @ ts %v: %v", op, key, ts, err, )) return } if prevVal != nil { *prevValPtr = prevVal.RawBytes } else { *prevValPtr = nil } } }
func (r *Replica) handleLogicalOpLogRaftMuLocked( ctx context.Context, ops *storagepb.LogicalOpLog, reader engine.Reader, ) { if r.raftMu.rangefeed == nil { return } if ops == nil { r.disconnectRangefeedWithReasonRaftMuLocked( roachpb.RangeFeedRetryError_REASON_LOGICAL_OPS_MISSING, ) return } if len(ops.Ops) == 0 { return }
for _, op := range ops.Ops { var key []byte var ts hlc.Timestamp var valPtr *[]byte switch t := op.GetValue().(type) { case *enginepb.MVCCWriteValueOp: key, ts, valPtr = t.Key, t.Timestamp, &t.Value case *enginepb.MVCCCommitIntentOp: key, ts, valPtr = t.Key, t.Timestamp, &t.Value case *enginepb.MVCCWriteIntentOp, *enginepb.MVCCUpdateIntentOp, *enginepb.MVCCAbortIntentOp, *enginepb.MVCCAbortTxnOp: continue default: panic(fmt.Sprintf("unknown logical op %T", t)) }
val, _, err := engine.MVCCGet(ctx, reader, key, ts, engine.MVCCGetOptions{Tombstones: true}) if val == nil && err == nil { err = errors.New("value missing in reader") } if err != nil { r.disconnectRangefeedWithErrRaftMuLocked(roachpb.NewErrorf( "error consuming %T for key %v @ ts %v: %v", op, key, ts, err, )) return } *valPtr = val.RawBytes }
if !r.raftMu.rangefeed.ConsumeLogicalOps(ops.Ops...) { r.resetRangefeedRaftMuLocked() } }
func (p *Processor) ConsumeLogicalOps(ops ...enginepb.MVICLogicalOp) bool { if p == nil { return true } if len(ops) == 0 { return true } return p.sendEvent(event{ops: ops}, p.EventChanTimeout) }
func (p *Processor) sendEvent(e event, timeout time.Duration) bool { if timeout == 0 { select { case p.eventC <- e: case <-p.stoppedC: } } else { select { case p.eventC <- e: case <-p.stoppedC: default: select { case p.eventC <- e: case <-p.stoppedC: case <-time.After(timeout): p.sendStop(newErrBufferCapacityExceeded()) return false } } } return true }
func (p *Processor) Start(stopper *stop.Stopper, rtsIter engine.SimpleIterator) { ctx := p.AnnotateCtx(context.Background()) stopper.RunWorker(ctx, func(ctx context.Context) { defer close(p.stoppedC) ctx, cancelOutputLoops := context.WithCancel(ctx) defer cancelOutputLoops()
if rtsIter != nil { initScan := newInitResolvedTSScan(p, rtsIter) err := stopper.RunAsyncTask(ctx, "rangefeed: init resolved ts", initScan.Run) if err != nil { initScan.Cancel() } } else { p.initResolvedTS(ctx) }
var txnPushTicker *time.Ticker var txnPushTickerC <-chan time.Time var txnPushAttemptC chan struct{} if p.PushTxnsInterval > 0 { txnPushTicker = time.NewTicker(p.PushTxnsInterval) txnPushTickerC = txnPushTicker.C defer txnPushTicker.Stop() }
for { select {
case r := <-p.regC: if !p.Span.AsRawSpanWithNoLocals().Contains(r.span) { log.Fatalf(ctx, "registration %s not in Processor's key range %v", r, p.Span) }
p.reg.Register(&r)
r.publish(p.newCheckpointEvent())
runOutputLoop := func(ctx context.Context) { r.runOutputLoop(ctx) select { case p.unregC <- &r: case <-p.stoppedC: } } if err := stopper.RunAsyncTask(ctx, "rangefeed: output loop", runOutputLoop); err != nil { if r.catchupIter != nil { r.catchupIter.Close() } r.disconnect(roachpb.NewError(err)) p.reg.Unregister(&r) }
case r := <-p.unregC: p.reg.Unregister(r)
case <-p.lenReqC: p.lenResC <- p.reg.Len() case <-p.filterReqC: p.filterResC <- p.reg.NewFilter()
case e := <-p.eventC: p.consumeEvent(ctx, e)
case <-txnPushTickerC: if !p.rts.IsInit() { continue }
now := p.Clock.Now() before := now.Add(-p.PushTxnsAge.Nanoseconds(), 0) oldTxns := p.rts.intentQ.Before(before)
if len(oldTxns) > 0 { toPush := make([]enginepb.TxnMeta, len(oldTxns)) for i, txn := range oldTxns { toPush[i] = txn.asTxnMeta() }
txnPushTickerC = nil txnPushAttemptC = make(chan struct{})
pushTxns := newTxnPushAttempt(p, toPush, now, txnPushAttemptC) err := stopper.RunAsyncTask(ctx, "rangefeed: pushing old txns", pushTxns.Run) if err != nil { pushTxns.Cancel() } }
case <-txnPushAttemptC: txnPushTickerC = txnPushTicker.C txnPushAttemptC = nil
case pErr := <-p.stopC: p.reg.DisconnectWithErr(all, pErr) return
case <-stopper.ShouldQuiesce(): p.reg.Disconnect(all) return } } }) }
func (p *Processor) consumeEvent(ctx context.Context, e event) { switch { case len(e.ops) > 0: p.consumeLogicalOps(ctx, e.ops) case e.ct != hlc.Timestamp{}: p.forwardClosedTS(ctx, e.ct) case e.initRTS: p.initResolvedTS(ctx) case e.syncC != nil: if e.testRegCatchupSpan.Valid() { if err := p.reg.waitForCaughtUp(e.testRegCatchupSpan); err != nil { log.Errorf( ctx, "error waiting for registries to catch up during test, results might be impacted: %s", err, ) } } close(e.syncC) default: panic("missing event variant") } }
func (p *Processor) consumeLogicalOps(ctx context.Context, ops []enginepb.MVICLogicalOp) { for _, op := range ops { switch t := op.GetValue().(type) { case *enginepb.MVCCWriteValueOp: p.publishValue(ctx, t.Key, t.Timestamp, t.Value, t.PrevValue) case *enginepb.MVCCWriteIntentOp:
case *enginepb.MVCCUpdateIntentOp:
case *enginepb.MVCCCommitIntentOp: p.publishValue(ctx, t.Key, t.Timestamp, t.Value, t.PrevValue)
case *enginepb.MVCCAbortIntentOp:
case *enginepb.MVCCAbortTxnOp:
default: panic(fmt.Sprintf("unknown logical op %T", t)) }
if p.rts.ConsumeLogicalOp(op) { p.publishCheckpoint(ctx) } } }
func (p *Processor) Register( span roachpb.RSpan, startTS hlc.Timestamp, catchupIter engine.SimpleIterator, withDiff bool, stream Stream, errC chan<- *roachpb.Error, ) { p.syncEventC()
r := newRegistration( span.AsRawSpanWithNoLocals(), startTS, catchupIter, withDiff, p.Config.EventChanCap, p.Metrics, stream, errC, ) select { case p.regC <- r: case <-p.stoppedC: if catchupIter != nil { catchupIter.Close() } select { case errC <- roachpb.NewErrorf("rangefeed processor closed"): default: } } }
func (r *Replica) RangeFeed( args *roachpb.RangeFeedRequest, stream roachpb.Internal_RangeFeedServer, ) *roachpb.Error { if !RangefeedEnabled.Get(&r.store.cfg.Settings.SV) { return roachpb.NewErrorf("rangefeeds require the kv.rangefeed.enabled setting. See " + base.DocsURL(`change-data-capture.html#enable-rangefeeds-to-reduce-latency`)) } ctx := r.AnnotateCtx(stream.Context())
var rSpan roachpb.RSpan var err error rSpan.Key, err = keys.Addr(args.Span.Key) if err != nil { return roachpb.NewError(err) } rSpan.EndKey, err = keys.Addr(args.Span.EndKey) if err != nil { return roachpb.NewError(err) }
if err := r.ensureClosedTimestampStarted(ctx); err != nil { return err }
checkTS := args.Timestamp if checkTS.IsEmpty() { checkTS = r.Clock().Now() }
lockedStream := &lockedRangefeedStream{wrapped: stream} errC := make(chan *roachpb.Error, 1)
usingCatchupIter := false var iterSemRelease func() if !args.Timestamp.IsEmpty() { usingCatchupIter = true lim := &r.store.limiters.ConcurrentRangefeedIters if err := lim.Begin(ctx); err != nil { return roachpb.NewError(err) } iterSemRelease = lim.Finish defer func() { if iterSemRelease != nil { iterSemRelease() } }() }
r.raftMu.Lock() if err := r.checkExecutionCanProceedForRangeFeed(rSpan, checkTS); err != nil { r.raftMu.Unlock() return roachpb.NewError(err) }
p := r.maybeInitRangefeedRaftMuLocked(ctx)
var catchUpIter engine.SimpleIterator if usingCatchupIter { innerIter := r.Engine().NewIterator(engine.IterOptions{ UpperBound: args.Span.EndKey, }) catchUpIter = iteratorWithCloser{ SimpleIterator: innerIter, close: iterSemRelease, } iterSemRelease = nil } p.Register(rSpan, args.Timestamp, catchUpIter, args.WithDiff, lockedStream, errC) r.raftMu.Unlock()
defer func() { r.raftMu.Lock() r.maybeDestroyRangefeedRaftMuLocked(p) r.raftMu.Unlock() }()
return <-errC }
func (r *registration) outputLoop(ctx context.Context) error { if r.catchupIter != nil { if err := r.runCatchupScan(); err != nil { err = errors.Wrap(err, "catch-up scan failed") log.Error(ctx, err) return err } }
for { overflowed := false r.mu.Lock() if len(r.buf) == 0 { overflowed = r.mu.overflowed r.mu.caughtUp = true } r.mu.Unlock() if overflowed { return newErrBufferCapacityExceeded().GoError() }
select { case nextEvent := <-r.buf: if err := r.stream.Send(nextEvent); err != nil { return err } case <-ctx.Done(): return ctx.Err() case <-r.stream.Context().Done(): return r.stream.Context().Err() } } }
func (s *Store) RangeFeed( args *roachpb.RangeFeedRequest, stream roachpb.Internal_RangeFeedServer, ) *roachpb.Error { if err := verifyKeys(args.Span.Key, args.Span.EndKey, true); err != nil { return roachpb.NewError(err) }
repl, err := s.GetReplica(args.RangeID) if err != nil { return roachpb.NewError(err) } if !repl.IsInitialized() { repl.mu.RLock() replicaID := repl.mu.replicaID repl.mu.RUnlock()
return roachpb.NewError(&roachpb.NotLeaseHolderError{ RangeID: args.RangeID, LeaseHolder: repl.creatingReplica, Replica: roachpb.ReplicaDescriptor{ NodeID: repl.store.nodeDesc.NodeID, StoreID: repl.store.StoreID(), ReplicaID: replicaID, }, }) } return repl.RangeFeed(args, stream) }
func (ls *Stores) RangeFeed( args *roachpb.RangeFeedRequest, stream roachpb.Internal_RangeFeedServer, ) *roachpb.Error { ctx := stream.Context() if args.RangeID == 0 { log.Fatal(ctx, "rangefeed request missing range ID") } else if args.Replica.StoreID == 0 { log.Fatal(ctx, "rangefeed request missing store ID") }
store, err := ls.GetStore(args.Replica.StoreID) if err != nil { return roachpb.NewError(err) }
return store.RangeFeed(args, stream) }
func (n *Node) RangeFeed( args *roachpb.RangeFeedRequest, stream roachpb.Internal_RangeFeedServer, ) error { pErr := n.stores.RangeFeed(args, stream) if pErr != nil { var event roachpb.RangeFeedEvent event.SetValue(&roachpb.RangeFeedError{ Error: *pErr, }) return stream.Send(&event) } return nil }
func (a internalClientAdapter) RangeFeed( ctx context.Context, args *roachpb.RangeFeedRequest, _ ...grpc.CallOption, ) (roachpb.Internal_RangeFeedClient, error) { ctx, cancel := context.WithCancel(ctx) rfAdapter := rangeFeedClientAdapter{ ctx: ctx, eventC: make(chan *roachpb.RangeFeedEvent, 128), errC: make(chan error, 1), }
go func() { defer cancel() err := a.InternalServer.RangeFeed(args, rfAdapter) if err == nil { err = io.EOF } rfAdapter.errC <- err }()
return rfAdapter, nil }
func (ds *DistSender) singleRangeFeed( ctx context.Context, span roachpb.Span, ts hlc.Timestamp, withDiff bool, desc *roachpb.RangeDescriptor, eventCh chan<- *roachpb.RangeFeedEvent, ) (hlc.Timestamp, *roachpb.Error) { args := roachpb.RangeFeedRequest{ Span: span, Header: roachpb.Header{ Timestamp: ts, RangeID: desc.RangeID, }, WithDiff: withDiff, }
var latencyFn LatencyFunc if ds.rpcContext != nil { latencyFn = ds.rpcContext.RemoteClocks.Latency } replicas := NewReplicaSlice(ds.gossip, desc) replicas.OptimizeReplicaOrder(ds.getNodeDescriptor(), latencyFn)
transport, err := ds.transportFactory(SendOptions{}, ds.nodeDialer, replicas) if err != nil { return args.Timestamp, roachpb.NewError(err) }
for { if transport.IsExhausted() { return args.Timestamp, roachpb.NewError(roachpb.NewSendError( fmt.Sprintf("sending to all %d replicas failed", len(replicas)), )) }
args.Replica = transport.NextReplica() clientCtx, client, err := transport.NextInternalClient(ctx) if err != nil { log.VErrEventf(ctx, 2, "RPC error: %s", err) continue } stream, err := client.RangeFeed(clientCtx, &args) if err != nil { log.VErrEventf(ctx, 2, "RPC error: %s", err) continue } for { event, err := stream.Recv() if err == io.EOF { return args.Timestamp, nil } if err != nil { return args.Timestamp, roachpb.NewError(err) } switch t := event.GetValue().(type) { case *roachpb.RangeFeedCheckpoint: if t.Span.Contains(args.Span) { args.Timestamp.Forward(t.ResolvedTS) } case *roachpb.RangeFeedError: log.VErrEventf(ctx, 2, "RangeFeedError: %s", t.Error.GoError()) return args.Timestamp, &t.Error } select { case eventCh <- event: case <-ctx.Done(): return args.Timestamp, roachpb.NewError(ctx.Err()) } } } }
func (ds *DistSender) partialRangeFeed( ctx context.Context, rangeInfo *singleRangeInfo, withDiff bool, rangeCh chan<- singleRangeInfo, eventCh chan<- *roachpb.RangeFeedEvent, ) error { span := rangeInfo.rs.AsRawSpanWithNoLocals() ts := rangeInfo.ts
for r := retry.StartWithCtx(ctx, ds.rpcRetryOptions); r.Next(); { if rangeInfo.desc == nil { var err error rangeInfo.desc, rangeInfo.token, err = ds.getDescriptor(ctx, rangeInfo.rs.Key, nil, false) if err != nil { log.VErrEventf(ctx, 1, "range descriptor re-lookup failed: %s", err) continue } }
maxTS, pErr := ds.singleRangeFeed(ctx, span, ts, withDiff, rangeInfo.desc, eventCh)
ts.Forward(maxTS)
if pErr != nil { if log.V(1) { log.Infof(ctx, "RangeFeed %s disconnected with last checkpoint %s ago: %v", span, timeutil.Since(ts.GoTime()), pErr) } switch t := pErr.GetDetail().(type) { case *roachpb.SendError, *roachpb.RangeNotFoundError: if err := rangeInfo.token.Evict(ctx); err != nil { return err } rangeInfo.desc = nil continue case *roachpb.RangeKeyMismatchError: if err := rangeInfo.token.Evict(ctx); err != nil { return err } return ds.divideAndSendRangeFeedToRanges(ctx, rangeInfo.rs, ts, rangeCh) case *roachpb.RangeFeedRetryError: switch t.Reason { case roachpb.RangeFeedRetryError_REASON_REPLICA_REMOVED, roachpb.RangeFeedRetryError_REASON_RAFT_SNAPSHOT, roachpb.RangeFeedRetryError_REASON_LOGICAL_OPS_MISSING, roachpb.RangeFeedRetryError_REASON_SLOW_CONSUMER: continue case roachpb.RangeFeedRetryError_REASON_RANGE_SPLIT, roachpb.RangeFeedRetryError_REASON_RANGE_MERGED: if err := rangeInfo.token.Evict(ctx); err != nil { return err } return ds.divideAndSendRangeFeedToRanges(ctx, rangeInfo.rs, ts, rangeCh) default: log.Fatalf(ctx, "unexpected RangeFeedRetryError reason %v", t.Reason) } default: return t } } } return nil }
func (ds *DistSender) RangeFeed( ctx context.Context, args *roachpb.RangeFeedRequest, withDiff bool, eventCh chan<- *roachpb.RangeFeedEvent, ) error { ctx = ds.AnnotateCtx(ctx) ctx, sp := tracing.EnsureChildSpan(ctx, ds.AmbientContext.Tracer, "dist sender") defer sp.Finish()
startRKey, err := keys.Addr(args.Span.Key) if err != nil { return err } endRKey, err := keys.Addr(args.Span.EndKey) if err != nil { return err } rs := roachpb.RSpan{Key: startRKey, EndKey: endRKey}
g := ctxgroup.WithContext(ctx) rangeCh := make(chan singleRangeInfo, 16) g.GoCtx(func(ctx context.Context) error { for { select { case sri := <-rangeCh: g.GoCtx(func(ctx context.Context) error { return ds.partialRangeFeed(ctx, &sri, withDiff, rangeCh, eventCh) }) case <-ctx.Done(): return ctx.Err() } } })
g.GoCtx(func(ctx context.Context) error { return ds.divideAndSendRangeFeedToRanges(ctx, rs, args.Timestamp, rangeCh) })
return g.Wait() }
func (p *poller) rangefeedImplIter(ctx context.Context, i int) error { _, withDiff := p.details.Opts[optDiff]
p.mu.Lock() lastHighwater := p.mu.highWater p.mu.Unlock() if err := p.tableHist.WaitForTS(ctx, lastHighwater); err != nil { return err }
spans, err := getSpansToProcess(ctx, p.db, p.spans) if err != nil { return err }
initialScan := i == 0 backfillWithDiff := !initialScan && withDiff var scanTime hlc.Timestamp p.mu.Lock() if len(p.mu.scanBoundaries) > 0 && p.mu.scanBoundaries[0].Equal(p.mu.highWater) { scanTime = p.mu.scanBoundaries[0] p.mu.scanBoundaries = p.mu.scanBoundaries[1:] } p.mu.Unlock() if scanTime != (hlc.Timestamp{}) { if err := p.exportSpansParallel(ctx, spans, scanTime, backfillWithDiff); err != nil { return err } } sender := p.db.NonTransactionalSender() ds := sender.(*client.CrossRangeTxnWrapperSender).Wrapped().(*kv.DistSender) g := ctxgroup.WithContext(ctx) eventC := make(chan *roachpb.RangeFeedEvent, 128)
memBuf := makeMemBuffer(p.mm.MakeBoundAccount(), p.metrics) defer memBuf.Close(ctx)
frontier := makeSpanFrontier(spans...)
rangeFeedStartTS := lastHighwater for _, span := range p.spans { req := &roachpb.RangeFeedRequest{ Header: roachpb.Header{ Timestamp: lastHighwater, }, Span: span, } frontier.Forward(span, rangeFeedStartTS) g.GoCtx(func(ctx context.Context) error { return ds.RangeFeed(ctx, req, withDiff, eventC) }) } g.GoCtx(func(ctx context.Context) error { for { select { case e := <-eventC: switch t := e.GetValue().(type) { case *roachpb.RangeFeedValue: kv := roachpb.KeyValue{Key: t.Key, Value: t.Value} var prevVal roachpb.Value if withDiff { prevVal = t.PrevValue } if err := memBuf.AddKV(ctx, kv, prevVal, hlc.Timestamp{}); err != nil { return err } case *roachpb.RangeFeedCheckpoint: if !t.ResolvedTS.IsEmpty() && t.ResolvedTS.Less(rangeFeedStartTS) { continue } if err := memBuf.AddResolved(ctx, t.Span, t.ResolvedTS); err != nil { return err } default: log.Fatalf(ctx, "unexpected RangeFeedEvent variant %v", t) } case <-ctx.Done(): return ctx.Err() } } }) g.GoCtx(func(ctx context.Context) error { for { e, err := memBuf.Get(ctx) if err != nil { return err } if e.kv.Key != nil { if err := p.tableHist.WaitForTS(ctx, e.kv.Value.Timestamp); err != nil { return err } pastBoundary := false p.mu.Lock() if len(p.mu.scanBoundaries) > 0 && p.mu.scanBoundaries[0].Less(e.kv.Value.Timestamp) { pastBoundary = true } p.mu.Unlock() if pastBoundary { continue } if err := p.buf.AddKV(ctx, e.kv, e.prevVal, e.schemaTimestamp, hlc.Timestamp{}); err != nil { return err } } else if e.resolved != nil { resolvedTS := e.resolved.Timestamp boundaryBreak := false if err := p.tableHist.WaitForTS(ctx, resolvedTS); err != nil { return err } p.mu.Lock() if len(p.mu.scanBoundaries) > 0 && !resolvedTS.Less(p.mu.scanBoundaries[0]) { boundaryBreak = true resolvedTS = p.mu.scanBoundaries[0] } p.mu.Unlock() if boundaryBreak { resolvedTS = resolvedTS.Prev() frontier.Forward(e.resolved.Span, resolvedTS) if frontier.Frontier() == resolvedTS { return errBoundaryReached } } else { if err := p.buf.AddResolved(ctx, e.resolved.Span, resolvedTS); err != nil { return err
} } } } }) if err := g.Wait(); err != nil && err != errBoundaryReached { return err } p.mu.Lock() p.mu.highWater = p.mu.scanBoundaries[0] p.mu.Unlock() return nil }
|