From b1ae24df65e4cd5b8e1dd20068f7af5734194a4a Mon Sep 17 00:00:00 2001 From: Kohei Tokunaga Date: Fri, 30 Jun 2023 21:17:46 +0900 Subject: [PATCH] Add walker utility usable for debugging LLB Walker is an utility for inspecting and debugging each vertex in an LLB graph. Signed-off-by: Kohei Tokunaga --- util/walker/breakpoints.go | 221 +++++++++++++++++++++++++++++++++++++ util/walker/controller.go | 179 ++++++++++++++++++++++++++++++ util/walker/walker.go | 190 +++++++++++++++++++++++++++++++ 3 files changed, 590 insertions(+) create mode 100644 util/walker/breakpoints.go create mode 100644 util/walker/controller.go create mode 100644 util/walker/walker.go diff --git a/util/walker/breakpoints.go b/util/walker/breakpoints.go new file mode 100644 index 00000000..0d84eabb --- /dev/null +++ b/util/walker/breakpoints.go @@ -0,0 +1,221 @@ +package walker + +import ( + "context" + "fmt" + "sort" + "sync" + "sync/atomic" + + "github.com/moby/buildkit/client/llb" + solverpb "github.com/moby/buildkit/solver/pb" + "github.com/pkg/errors" +) + +// Breakpoint represents a breakpoint. +type Breakpoint interface { + IsTarget(ctx context.Context, st llb.State, contErr error) (yes bool, hitLocations []*solverpb.Range, err error) + IsMarked(line int64) bool + String() string + Init() +} + +// Breakpoints manages a set of breakpoints. +type Breakpoints struct { + isBreakAllNode atomic.Bool + + breakpoints map[string]Breakpoint + breakpointsMu sync.Mutex + + breakpointIdx int64 +} + +// NewBreakpoints returns an empty set of breakpoints. +func NewBreakpoints() *Breakpoints { + return &Breakpoints{} +} + +// Add adds a breakpoint with the specified key. +func (b *Breakpoints) Add(key string, bp Breakpoint) (string, error) { + b.breakpointsMu.Lock() + defer b.breakpointsMu.Unlock() + if b.breakpoints == nil { + b.breakpoints = make(map[string]Breakpoint) + } + if key == "" { + key = fmt.Sprintf("%d", atomic.AddInt64(&b.breakpointIdx, 1)) + } + if _, ok := b.breakpoints[key]; ok { + return "", errors.Errorf("breakpoint %q already exists: %v", key, b) + } + b.breakpoints[key] = bp + return key, nil +} + +// Clear removes the specified breakpoint. +func (b *Breakpoints) Clear(key string) { + b.breakpointsMu.Lock() + defer b.breakpointsMu.Unlock() + delete(b.breakpoints, key) +} + +// ClearAll removes all breakpoints. +func (b *Breakpoints) ClearAll() { + b.breakpointsMu.Lock() + defer b.breakpointsMu.Unlock() + b.breakpoints = nil + atomic.StoreInt64(&b.breakpointIdx, 0) +} + +// ForEach calls the callback on each breakpoint. +func (b *Breakpoints) ForEach(f func(key string, bp Breakpoint) bool) { + var keys []string + for k := range b.breakpoints { + keys = append(keys, k) + } + sort.Strings(keys) + for _, k := range keys { + if !f(k, b.breakpoints[k]) { + return + } + } +} + +// BreakAllNode enables to configure to break on each node. +func (b *Breakpoints) BreakAllNode(v bool) { + b.isBreakAllNode.Store(v) +} + +func (b *Breakpoints) isBreakpoint(ctx context.Context, st llb.State, handleErr error) (bool, map[string][]*solverpb.Range, error) { + if b.isBreakAllNode.Load() { + return true, nil, nil + } + + b.breakpointsMu.Lock() + defer b.breakpointsMu.Unlock() + hits := make(map[string][]*solverpb.Range) + for k, bp := range b.breakpoints { + isBreak, bhits, err := bp.IsTarget(ctx, st, handleErr) + if err != nil { + return false, nil, err + } + if isBreak { + hits[k] = append(hits[k], bhits...) + } + } + if len(hits) > 0 { + return true, hits, nil + } + return false, nil, nil +} + +// NewLineBreakpoint returns a breakpoint to break on the specified line. +func NewLineBreakpoint(line int64) Breakpoint { + return &lineBreakpoint{line} +} + +type lineBreakpoint struct { + line int64 +} + +func (b *lineBreakpoint) Init() {} + +func (b *lineBreakpoint) IsTarget(ctx context.Context, st llb.State, _ error) (yes bool, hitLocations []*solverpb.Range, err error) { + _, _, _, sources, err := st.Output().Vertex(ctx, nil).Marshal(ctx, nil) + if err != nil { + return false, nil, err + } + hits := make(map[solverpb.Range]struct{}) + line := b.line + for _, loc := range sources { + for _, r := range loc.Ranges { + if int64(r.Start.Line) <= line && line <= int64(r.End.Line) { + hits[*r] = struct{}{} + } + } + } + if len(hits) > 0 { + var ret []*solverpb.Range + for r := range hits { + ret = append(ret, &r) + } + return true, ret, nil + } + return false, nil, nil +} + +func (b *lineBreakpoint) IsMarked(line int64) bool { + return line == b.line +} + +func (b *lineBreakpoint) String() string { + return fmt.Sprintf("line: %d", b.line) +} + +// NewStopOnEntryBreakpoint returns a breakpoint that breaks at the first node. +func NewStopOnEntryBreakpoint() Breakpoint { + b := stopOnEntryBreakpoint(true) + return &b +} + +type stopOnEntryBreakpoint bool + +func (b *stopOnEntryBreakpoint) Init() { + *b = true +} + +func (b *stopOnEntryBreakpoint) IsTarget(ctx context.Context, st llb.State, _ error) (yes bool, hitLocations []*solverpb.Range, err error) { + if *b { + *b = false // stop only once + return true, nil, nil + } + return false, nil, nil +} + +func (b *stopOnEntryBreakpoint) IsMarked(line int64) bool { + return false +} + +func (b *stopOnEntryBreakpoint) String() string { + return fmt.Sprintf("stop on entry") +} + +// NewOnErrorBreakpoint returns a breakpoint that breaks when an error observed. +func NewOnErrorBreakpoint() Breakpoint { + return &onErrorBreakpoint{} +} + +type onErrorBreakpoint struct{} + +func (b *onErrorBreakpoint) Init() {} + +func (b *onErrorBreakpoint) IsTarget(ctx context.Context, st llb.State, handleErr error) (yes bool, hitLocations []*solverpb.Range, err error) { + if handleErr == nil { + return false, nil, nil + } + _, _, _, sources, err := st.Output().Vertex(ctx, nil).Marshal(ctx, nil) + if err != nil { + return false, nil, err + } + hits := make(map[solverpb.Range]struct{}) + for _, loc := range sources { + for _, r := range loc.Ranges { + hits[*r] = struct{}{} + } + } + var ret []*solverpb.Range + if len(hits) > 0 { + for r := range hits { + ret = append(ret, &r) + } + } + return true, ret, nil +} + +func (b *onErrorBreakpoint) IsMarked(line int64) bool { + return false +} + +func (b *onErrorBreakpoint) String() string { + return fmt.Sprintf("stop on error") +} diff --git a/util/walker/controller.go b/util/walker/controller.go new file mode 100644 index 00000000..08a55534 --- /dev/null +++ b/util/walker/controller.go @@ -0,0 +1,179 @@ +package walker + +import ( + "context" + "sync" + + solverpb "github.com/moby/buildkit/solver/pb" + "github.com/pkg/errors" +) + +// Controller is a utility to control walkers with debugger-like interface like "continue" and "next". +type Controller struct { + def *solverpb.Definition + breakpoints *Breakpoints + + breakHandler BreakHandlerFunc + onVertexHandler OnVertexHandlerFunc + onWalkDoneFunc func(error) + + walker *Walker + walkerMu sync.Mutex + walkCancel func() + + curStepDoneCh chan struct{} + + curWalkErrCh chan error + curWalkDoneCh chan struct{} +} + +// Status is a status of the controller. +type Status struct { + + // Definition is the target definition where walking is performed. + Definition *solverpb.Definition + + // Cursors is current cursor positions on the walker. + Cursors []solverpb.Range +} + +// NewController returns a walker controller. +func NewController(def *solverpb.Definition, breakpoints *Breakpoints, breakHandler BreakHandlerFunc, onVertexHandler OnVertexHandlerFunc, onWalkDoneFunc func(error)) *Controller { + return &Controller{ + def: def, + breakpoints: breakpoints, + breakHandler: breakHandler, + onVertexHandler: onVertexHandler, + onWalkDoneFunc: onWalkDoneFunc, + } +} + +// Breakpoint returns a set of breakpoints currently recognized. +func (c *Controller) Breakpoints() *Breakpoints { + return c.breakpoints +} + +// Inspect returns the current status. +func (c *Controller) Inspect() *Status { + c.walkerMu.Lock() + defer c.walkerMu.Unlock() + var cursors []solverpb.Range + if c.walker != nil { + cursors = c.walker.GetCursors() + } + return &Status{ + Definition: c.def, + Cursors: cursors, + } +} + +// IsStarted returns true when there is an on-going walker. Returns false otherwise. +func (c *Controller) IsStarted() bool { + c.walkerMu.Lock() + w := c.walker + c.walkerMu.Unlock() + return w != nil +} + +// StartWalk starts walking in a gorouitne. This function returns immediately without waiting for the +// completion of the walking. Parallel invoking of this method isn't supported and an error will be returned +// if there is an on-going walker. Previous walking must be canceled using WalkCancel method. +func (c *Controller) StartWalk() error { + c.walkerMu.Lock() + if c.walker != nil { + c.walkerMu.Unlock() + return errors.Errorf("walker already running") + } + c.walkerMu.Unlock() + + go func() { + w := NewWalker(c.breakpoints, func(ctx context.Context, bCtx *BreakContext) error { + if err := c.breakHandler(ctx, bCtx); err != nil { + return err + } + curStepDoneCh := make(chan struct{}) + c.curStepDoneCh = curStepDoneCh + select { + case <-curStepDoneCh: + case <-ctx.Done(): + return ctx.Err() + } + return nil + }, c.onVertexHandler) + + c.walkerMu.Lock() + c.walker = w + c.walkerMu.Unlock() + + ctx, cancel := context.WithCancel(context.TODO()) + c.walkCancel = cancel + c.curWalkErrCh = make(chan error) + c.curWalkDoneCh = make(chan struct{}) + err := w.Walk(ctx, c.def) + c.onWalkDoneFunc(err) + w.Close() + + c.walkerMu.Lock() + c.walker = nil + c.walkerMu.Unlock() + + if err != nil { + c.curWalkErrCh <- err + } + close(c.curWalkDoneCh) + }() + + return nil +} + +// WalkCancel cancels on-going walking. +func (c *Controller) WalkCancel() error { + c.walkerMu.Lock() + if c.walker != nil { + c.walker = nil + c.walkerMu.Unlock() + c.walkCancel() + select { + case err := <-c.curWalkErrCh: + return err + case <-c.curWalkDoneCh: + } + return nil + } + c.walkerMu.Unlock() + return nil +} + +// Continue resumes the walker. The walker will stop at the next breakpoint. +func (c *Controller) Continue() { + c.walkerMu.Lock() + defer c.walkerMu.Unlock() + if c.walker != nil { + c.walker.BreakAllNode(false) + } + if c.curStepDoneCh != nil { + close(c.curStepDoneCh) + c.curStepDoneCh = nil + } +} + +// Next resumes the walker. The walker will stop at the next vertex. +func (c *Controller) Next() error { + c.walkerMu.Lock() + defer c.walkerMu.Unlock() + if c.walker != nil { + c.walker.BreakAllNode(true) + } else { + return errors.Errorf("walker isn't running") + } + if c.curStepDoneCh != nil { + close(c.curStepDoneCh) + c.curStepDoneCh = nil + } + return nil +} + +// Close closes this controller. +func (c *Controller) Close() error { + return c.WalkCancel() +} diff --git a/util/walker/walker.go b/util/walker/walker.go new file mode 100644 index 00000000..5cbc472c --- /dev/null +++ b/util/walker/walker.go @@ -0,0 +1,190 @@ +package walker + +import ( + "context" + "sync" + + "github.com/moby/buildkit/client/llb" + solverpb "github.com/moby/buildkit/solver/pb" + "github.com/pkg/errors" + "golang.org/x/sync/errgroup" +) + +// BreakContext contains information about the current breakpoint +type BreakContext struct { + // State is the current LLB state + State llb.State + + // Cursors are current cursor locations + Cursors []solverpb.Range // walker should pass the hit range to the hander. + + // Hits are all breakpoints hit. + Hits map[string][]*solverpb.Range // walker should pass this to the handler. + + // Definition is the definition of the current LLB. + Definition *solverpb.Definition + + // Breakpoints is a set of registered breakpoints + Breakpoints *Breakpoints +} + +// BreakHandlerFunc is a callback function to be call on each break +type BreakHandlerFunc func(ctx context.Context, bCtx *BreakContext) error + +// OnVertexHandlerFunc is a callback function called on each vertex. +type OnVertexHandlerFunc func(ctx context.Context, st llb.State) error + +// Walker walks an LLB tree from the leaves to the root. Can be controlled using breakpoints. +type Walker struct { + breakHandler BreakHandlerFunc + breakpoints *Breakpoints + closed bool + cursors map[solverpb.Range]int + cursorsMu sync.Mutex + + mu sync.Mutex + + inputsMu sync.Mutex + + onVertexHandler OnVertexHandlerFunc +} + +// NewWalker returns a walker configured with the breakpoints and breakpoint handler. +// onVertexHandlerFunc is called on each vertex including non breakpoints. +func NewWalker(bps *Breakpoints, breakHandler BreakHandlerFunc, onVertexHandler OnVertexHandlerFunc) *Walker { + bps.ForEach(func(key string, bp Breakpoint) bool { + bp.Init() + return true + }) + return &Walker{ + breakHandler: breakHandler, + breakpoints: bps, + onVertexHandler: onVertexHandler, + } +} + +// Close closes the walker. +func (w *Walker) Close() { + w.closed = true + w.breakpoints = nil +} + +// GetCursors returns positions where the walker is currently looking at. +func (w *Walker) GetCursors() (res []solverpb.Range) { + w.cursorsMu.Lock() + defer w.cursorsMu.Unlock() + for r, i := range w.cursors { + if i > 0 { + res = append(res, r) + } + } + return +} + +// BreakAllNode configures whether the walker breaks on each vertex. +func (w *Walker) BreakAllNode(v bool) { + w.breakpoints.BreakAllNode(v) +} + +// Walk starts walking the specified LLB from the leaves to the root. +// func (w *Walker) Walk(ctx context.Context, st llb.State) error { +func (w *Walker) Walk(ctx context.Context, def *solverpb.Definition) error { + defOp, err := llb.NewDefinitionOp(def) + if err != nil { + return err + } + return w.walk(ctx, def, llb.NewState(defOp), 0) +} + +func (w *Walker) inputsOfState(ctx context.Context, st llb.State) []llb.Output { + w.inputsMu.Lock() + defer w.inputsMu.Unlock() + return st.Output().Vertex(ctx, nil).Inputs() +} + +func (w *Walker) walk(ctx context.Context, orgDef *solverpb.Definition, st llb.State, depth int) error { + eg, egCtx := errgroup.WithContext(ctx) + for _, o := range w.inputsOfState(ctx, st) { + o := o + eg.Go(func() error { + return w.walk(egCtx, orgDef, llb.NewState(o), depth+1) + }) + } + if err := eg.Wait(); err != nil { + return err + } + + w.mu.Lock() + defer w.mu.Unlock() + + def, err := st.Marshal(ctx) + if err != nil { + return err + } + dgst, _, _, _, err := st.Output().Vertex(ctx, nil).Marshal(ctx, nil) + if err != nil { + return err + } + var ranges []solverpb.Range + if def.Source != nil { + if locs, ok := def.Source.Locations[dgst.String()]; ok { + for _, loc := range locs.Locations { + for _, r := range loc.Ranges { + ranges = append(ranges, *r) + } + } + } + } + w.addCursor(ranges) + defer func() { + w.removeCursor(ranges) + + }() + if w.closed { + return errors.Errorf("walker closed") + } + handleErr := w.onVertexHandler(ctx, st) + isBreak, hits, err := w.breakpoints.isBreakpoint(ctx, st, handleErr) + if err != nil { + return err + + } + if isBreak { + err = w.breakHandler(ctx, &BreakContext{ + State: st, + Cursors: w.GetCursors(), + Hits: hits, + Definition: orgDef, + Breakpoints: w.breakpoints, + }) + } + if err != nil { + return err + } + return handleErr +} + +func (w *Walker) addCursor(ranges []solverpb.Range) { + w.cursorsMu.Lock() + defer w.cursorsMu.Unlock() + for _, r := range ranges { + if w.cursors == nil { + w.cursors = make(map[solverpb.Range]int) + } + w.cursors[r]++ + } +} + +func (w *Walker) removeCursor(ranges []solverpb.Range) { + w.cursorsMu.Lock() + defer w.cursorsMu.Unlock() + if w.cursors == nil { + return + } + for _, r := range ranges { + w.cursors[r] = w.cursors[r] - 1 + if w.cursors[r] == 0 { + delete(w.cursors, r) + } + } +}