Use Runner

This revamps code quite a bit. Series handling has been moved into the
gerrit client, it also handles caching.

The Runner logic itself has been greatly simplified.

The runner logic has been moved into the runner.go, submitqueue.go is
gone.

The "per-run result object" concept has been dropped - we instead just
use annotated logs.

Also, we switched to apex/log
This commit is contained in:
Florian Klink 2019-12-02 10:00:32 +01:00
parent 7bafef7a84
commit 04a24a0c60
14 changed files with 486 additions and 537 deletions

View file

@ -1,55 +0,0 @@
package submitqueue
import (
"time"
"github.com/sirupsen/logrus"
)
// Problem: no inspection during the run
// Problem: record the state
// Result contains all data necessary to inspect a previous run
// This includes the Series from that run, and all Log Entries collected.
// It also implements the interface required for logrus.Hook.
type Result struct {
LogEntries []*logrus.Entry
Series []Serie
Error error
startTime time.Time
HEAD string
}
// MakeResult produces a new Result struct,
// and initializes startTime with the current time.
func MakeResult() *Result {
return &Result{
startTime: time.Now(),
}
}
// StartTime returns the startTime
func (r Result) StartTime() time.Time {
return r.startTime
}
// EndTime returns the time of the latest log entry
func (r Result) EndTime() time.Time {
if len(r.LogEntries) == 0 {
return r.startTime
}
return r.LogEntries[len(r.LogEntries)-1].Time
}
// Fire is called by logrus on each log event,
// we collect all log entries in the struct variable
func (r *Result) Fire(entry *logrus.Entry) error {
r.LogEntries = append(r.LogEntries, entry)
return nil
}
// Levels is called by logrus to determine whether to Fire the handler.
// As we want to collect all log entries, we return logrus.AllLevels
func (r *Result) Levels() []logrus.Level {
return logrus.AllLevels
}

View file

@ -1,60 +1,203 @@
package submitqueue
import (
"fmt"
"sync"
"time"
"github.com/apex/log"
"github.com/tweag/gerrit-queue/gerrit"
)
// Runner supervises the submit queue and records historical data about it
// Runner is a struct existing across the lifetime of a single run of the submit queue
// it contains a mutex to avoid being run multiple times.
// In fact, it even cancels runs while another one is still in progress.
// It contains a Gerrit object facilitating access, a log object, the configured submit queue tag
// and a `wipSerie` (only populated if waiting for a rebase)
type Runner struct {
mut sync.Mutex
submitQueue *SubmitQueue
currentlyRunning *time.Time
results []*Result
currentlyRunning bool
wipSerie *gerrit.Serie
logger *log.Logger
gerrit *gerrit.Client
submitQueueTag string // the tag used to submit something to the submit queue
}
// NewRunner initializes a new runner object
func NewRunner(sq *SubmitQueue) *Runner {
// NewRunner creates a new Runner struct
func NewRunner(logger *log.Logger, gerrit *gerrit.Client, submitQueueTag string) *Runner {
return &Runner{
submitQueue: sq,
results: []*Result{},
logger: logger,
gerrit: gerrit,
submitQueueTag: submitQueueTag,
}
}
// GetState returns a copy of all the state for the frontend
func (r *Runner) GetState() (SubmitQueue, *time.Time, []*Result) {
r.mut.Lock()
defer r.mut.Unlock()
return *r.submitQueue, r.currentlyRunning, r.results
// isAutoSubmittable determines if something could be autosubmitted, potentially requiring a rebase
// for this, it needs to:
// * have the auto-submit label
// * has +2 review
// * has +1 CI
func (r *Runner) isAutoSubmittable(s *gerrit.Serie) bool {
for _, c := range s.ChangeSets {
if c.Verified != 1 || c.CodeReviewed != 2 || !c.HasTag(r.submitQueueTag) {
return false
}
}
return true
}
// Trigger starts a new batch job
// TODO: make sure only one batch job is started at the same time
// if a batch job is already started, ignore the newest request
// TODO: be more granular in dry-run mode
func (r *Runner) Trigger(fetchOnly bool) {
// IsCurrentlyRunning returns true if the runner is currently running
func (r *Runner) IsCurrentlyRunning() bool {
return r.currentlyRunning
}
// GetWIPSerie returns the current wipSerie, if any, nil otherwiese
// Acquires a lock, so check with IsCurrentlyRunning first
func (r *Runner) GetWIPSerie() *gerrit.Serie {
r.mut.Lock()
if r.currentlyRunning != nil {
return
defer func() {
r.mut.Unlock()
}()
return r.wipSerie
}
// Trigger gets triggered periodically
func (r *Runner) Trigger(fetchOnly bool) error {
// TODO: If CI fails, remove the auto-submit labels => rules.pl
// Only one trigger can run at the same time
r.mut.Lock()
if r.currentlyRunning {
return fmt.Errorf("Already running, skipping")
}
now := time.Now()
r.currentlyRunning = &now
r.currentlyRunning = true
r.mut.Unlock()
defer func() {
r.mut.Lock()
r.currentlyRunning = nil
r.currentlyRunning = false
r.mut.Unlock()
}()
result := r.submitQueue.Run(fetchOnly)
r.mut.Lock()
// drop tail if size > 10
if len(r.results) > 10 {
r.results = append([]*Result{result}, r.results[:9]...)
} else {
r.results = append([]*Result{result}, r.results...)
// isReady means a series is auto submittbale and rebased on HEAD
isReady := func(s *gerrit.Serie) bool {
return r.isAutoSubmittable(s) && r.gerrit.SerieIsRebasedOnHEAD(s)
}
r.mut.Unlock()
isAwaitingCI := func(s *gerrit.Serie) bool {
for _, c := range s.ChangeSets {
if !(c.Verified == 0 && c.CodeReviewed != 2 && c.HasTag(r.submitQueueTag)) {
return false
}
}
return true
}
// Prepare the work by creating a local cache of gerrit state
r.gerrit.Refresh()
// early return if we only want to fetch
if fetchOnly {
return nil
}
if r.wipSerie != nil {
// refresh wipSerie with how it looks like in gerrit now
wipSerie := r.gerrit.FindSerie(func(s *gerrit.Serie) bool {
// the new wipSerie needs to have the same number of changesets
if len(r.wipSerie.ChangeSets) != len(s.ChangeSets) {
return false
}
// … and the same ChangeIDs.
for idx, c := range s.ChangeSets {
if r.wipSerie.ChangeSets[idx].ChangeID != c.ChangeID {
return false
}
}
return true
})
if wipSerie == nil {
r.logger.WithField("wipSerie", r.wipSerie).Warn("wipSerie has disappeared")
r.wipSerie = nil
} else {
r.wipSerie = wipSerie
}
}
for {
// initialize logger
r.logger.Info("Running")
if r.wipSerie != nil {
// if we have a wipSerie
l := r.logger.WithField("wipSerie", r.wipSerie)
l.Info("Checking wipSerie")
if !r.gerrit.SerieIsRebasedOnHEAD(r.wipSerie) {
// check for chaos monkeys
l.Warnf("HEAD has moved to {} while still waiting for wipSerie, discarding it", r.gerrit.GetHEAD())
r.wipSerie = nil
} else if isAwaitingCI(r.wipSerie) {
// the changeset is still awaiting for CI feedback
l.Info("keep waiting for wipSerie")
// break the loop, take a look at it at the next trigger.
break
} else if isReady(r.wipSerie) {
// if the WIP changeset is ready (auto submittable and rebased on HEAD), submit
for _, changeset := range r.wipSerie.ChangeSets {
_, err := r.gerrit.SubmitChangeset(changeset)
if err != nil {
l.WithField("changeset", changeset).Error("error submitting changeset")
r.wipSerie = nil
return err
}
}
r.wipSerie = nil
} else {
// should never be reached?!
}
}
r.logger.Info("Looking for series ready to submit")
// Find serie, that:
// * has the auto-submit label
// * has +2 review
// * has +1 CI
// * is rebased on master
serie := r.gerrit.FindSerie(isReady)
if serie != nil {
r.logger.WithField("serie", serie).Info("Found serie to submit without necessary rebase")
r.wipSerie = serie
continue
}
// Find serie, that:
// * has the auto-submit label
// * has +2 review
// * has +1 CI
// * is NOT rebased on master
serie = r.gerrit.FindSerie(r.isAutoSubmittable)
if serie == nil {
r.logger.Info("nothing to do, going back to sleep.")
break
}
l := r.logger.WithField("serie", serie)
l.Info("found serie, which needs a rebase")
// TODO: move into Client.RebaseSeries function
head := r.gerrit.GetHEAD()
for _, changeset := range serie.ChangeSets {
changeset, err := r.gerrit.RebaseChangeset(changeset, head)
if err != nil {
l.Error(err.Error())
return err
}
head = changeset.CommitID
}
// it doesn't matter this serie isn't in its rebased state,
// we'll refetch it on the beginning of the next trigger anyways
r.wipSerie = serie
break
}
r.logger.Info("Run complete")
return nil
}

View file

@ -1,116 +0,0 @@
package submitqueue
import (
"fmt"
"strings"
"github.com/tweag/gerrit-queue/gerrit"
log "github.com/sirupsen/logrus"
)
// Serie represents a list of successive changesets with an unbroken parent -> child relation,
// starting from the parent.
type Serie struct {
ChangeSets []*gerrit.Changeset
}
// GetParentCommitIDs returns the parent commit IDs
func (s *Serie) GetParentCommitIDs() ([]string, error) {
if len(s.ChangeSets) == 0 {
return nil, fmt.Errorf("Can't return parent on a serie with zero ChangeSets")
}
return s.ChangeSets[0].ParentCommitIDs, nil
}
// GetLeafCommitID returns the commit id of the last commit in ChangeSets
func (s *Serie) GetLeafCommitID() (string, error) {
if len(s.ChangeSets) == 0 {
return "", fmt.Errorf("Can't return leaf on a serie with zero ChangeSets")
}
return s.ChangeSets[len(s.ChangeSets)-1].CommitID, nil
}
// CheckIntegrity checks that the series contains a properly ordered and connected chain of commits
func (s *Serie) CheckIntegrity() error {
logger := log.WithFields(log.Fields{
"serie": s,
})
// an empty serie is invalid
if len(s.ChangeSets) == 0 {
return fmt.Errorf("An empty serie is invalid")
}
previousCommitID := ""
for i, changeset := range s.ChangeSets {
// we can't really check the parent of the first commit
// so skip verifying that one
logger.WithFields(log.Fields{
"changeset": changeset.String(),
"previousCommitID": fmt.Sprintf("%.7s", previousCommitID),
}).Debug(" - verifying changeset")
parentCommitIDs := changeset.ParentCommitIDs
if len(parentCommitIDs) == 0 {
return fmt.Errorf("Changesets without any parent are not supported")
}
// we don't check parents of the first changeset in a series
if i != 0 {
if len(parentCommitIDs) != 1 {
return fmt.Errorf("Merge commits in the middle of a series are not supported (only at the beginning)")
}
if parentCommitIDs[0] != previousCommitID {
return fmt.Errorf("changesets parent commit id doesn't match previous commit id")
}
}
// update previous commit id for the next loop iteration
previousCommitID = changeset.CommitID
}
return nil
}
// FilterAllChangesets applies a filter function on all of the changesets in the series.
// returns true if it returns true for all changesets, false otherwise
func (s *Serie) FilterAllChangesets(f func(c *gerrit.Changeset) bool) bool {
for _, changeset := range s.ChangeSets {
if f(changeset) == false {
return false
}
}
return true
}
func (s *Serie) String() string {
var sb strings.Builder
sb.WriteString(fmt.Sprintf("Serie[%d]", len(s.ChangeSets)))
if len(s.ChangeSets) == 0 {
sb.WriteString("()\n")
return sb.String()
}
parentCommitIDs, err := s.GetParentCommitIDs()
if err == nil {
if len(parentCommitIDs) == 1 {
sb.WriteString(fmt.Sprintf("(parent: %.7s)", parentCommitIDs[0]))
} else {
sb.WriteString("(merge: ")
for i, parentCommitID := range parentCommitIDs {
sb.WriteString(fmt.Sprintf("%.7s", parentCommitID))
if i < len(parentCommitIDs) {
sb.WriteString(", ")
}
}
sb.WriteString(")")
}
}
sb.WriteString(fmt.Sprintf("(%.7s..%.7s)",
s.ChangeSets[0].CommitID,
s.ChangeSets[len(s.ChangeSets)-1].CommitID))
return sb.String()
}
func shortCommitID(commitID string) string {
return commitID[:6]
}

View file

@ -1,125 +0,0 @@
package submitqueue
import (
"sort"
"github.com/tweag/gerrit-queue/gerrit"
"github.com/sirupsen/logrus"
)
// AssembleSeries consumes a list of `Changeset`, and groups them together to series
//
// As we have no control over the order of the passed changesets,
// we maintain two lookup tables,
// mapLeafToSerie, which allows to lookup a serie by its leaf commit id,
// to append to an existing serie
// and mapParentToSeries, which allows to lookup all series having a certain parent commit id,
// to prepend to any of the existing series
// if we can't find anything, we create a new series
func AssembleSeries(changesets []*gerrit.Changeset, log *logrus.Logger) ([]*Serie, error) {
series := make([]*Serie, 0)
mapLeafToSerie := make(map[string]*Serie, 0)
for _, changeset := range changesets {
logger := log.WithFields(logrus.Fields{
"changeset": changeset.String(),
})
logger.Debug("creating initial serie")
serie := &Serie{
ChangeSets: []*gerrit.Changeset{changeset},
}
series = append(series, serie)
mapLeafToSerie[changeset.CommitID] = serie
}
// Combine series using a fixpoint approach, with a max iteration count.
log.Debug("glueing together phase")
for i := 1; i < 100; i++ {
didUpdate := false
log.Debugf("at iteration %d", i)
for _, serie := range series {
logger := log.WithField("serie", serie.String())
parentCommitIDs, err := serie.GetParentCommitIDs()
if err != nil {
return series, err
}
if len(parentCommitIDs) != 1 {
// We can't append merge commits to other series
logger.Infof("No single parent, skipping.")
continue
}
parentCommitID := parentCommitIDs[0]
logger.Debug("Looking for a predecessor.")
// if there's another serie that has this parent as a leaf, glue together
if otherSerie, ok := mapLeafToSerie[parentCommitID]; ok {
if otherSerie == serie {
continue
}
logger := logger.WithField("otherSerie", otherSerie)
myLeafCommitID, err := serie.GetLeafCommitID()
if err != nil {
return series, err
}
// append our changesets to the other serie
logger.Debug("Splicing together.")
otherSerie.ChangeSets = append(otherSerie.ChangeSets, serie.ChangeSets...)
delete(mapLeafToSerie, parentCommitID)
mapLeafToSerie[myLeafCommitID] = otherSerie
// orphan our serie
serie.ChangeSets = []*gerrit.Changeset{}
// remove the orphaned serie from the lookup table
delete(mapLeafToSerie, myLeafCommitID)
didUpdate = true
} else {
logger.Debug("Not found.")
}
}
series = removeOrphanedSeries(series)
if !didUpdate {
log.Infof("converged after %d iterations", i)
break
}
}
// Check integrity, just to be on the safe side.
for _, serie := range series {
logger := log.WithFields(logrus.Fields{
"serie": serie.String(),
})
logger.Debugf("checking integrity")
err := serie.CheckIntegrity()
if err != nil {
logger.Errorf("checking integrity failed: %s", err)
}
}
return series, nil
}
// removeOrphanedSeries removes all empty series (that contain zero changesets)
func removeOrphanedSeries(series []*Serie) []*Serie {
newSeries := []*Serie{}
for _, serie := range series {
if len(serie.ChangeSets) != 0 {
newSeries = append(newSeries, serie)
}
}
return newSeries
}
// SortSeries sorts a list of series by the number of changesets in each serie, descending
func SortSeries(series []*Serie) []*Serie {
newSeries := make([]*Serie, len(series))
copy(newSeries, series)
sort.Slice(newSeries, func(i, j int) bool {
// the weight depends on the amount of changesets series changeset size
return len(series[i].ChangeSets) > len(series[j].ChangeSets)
})
return newSeries
}

View file

@ -1,231 +0,0 @@
package submitqueue
import (
"fmt"
"github.com/tweag/gerrit-queue/gerrit"
"github.com/sirupsen/logrus"
)
// SubmitQueue contains a list of series, a gerrit connection, and some project configuration
type SubmitQueue struct {
Series []*Serie
gerrit gerrit.IClient
ProjectName string
BranchName string
HEAD string
SubmitQueueTag string // the tag used to submit something to the submit queue
URL string
}
// MakeSubmitQueue builds a new submit queue
func MakeSubmitQueue(gerritClient gerrit.IClient, projectName string, branchName string, submitQueueTag string) *SubmitQueue {
return &SubmitQueue{
Series: make([]*Serie, 0),
gerrit: gerritClient,
ProjectName: projectName,
BranchName: branchName,
SubmitQueueTag: submitQueueTag,
}
}
// LoadSeries fills .Series by searching changesets, and assembling them to Series.
func (s *SubmitQueue) LoadSeries(log *logrus.Logger) error {
var queryString = fmt.Sprintf("status:open project:%s branch:%s", s.ProjectName, s.BranchName)
log.Debugf("Running query %s", queryString)
// Download changesets from gerrit
changesets, err := s.gerrit.SearchChangesets(queryString)
if err != nil {
return err
}
// Assemble to series
series, err := AssembleSeries(changesets, log)
if err != nil {
return err
}
// Sort by size
s.Series = SortSeries(series)
return nil
}
// TODO: clear submit queue tag if missing +1/+2?
// IsAutoSubmittable returns true if a given Serie has all the necessary flags set
// meaning it would be fine to rebase and/or submit it.
// This means, every changeset needs to:
// - have the s.SubmitQueueTag hashtag
// - be verified (+1 by CI)
// - be code reviewed (+2 by a human)
func (s *SubmitQueue) IsAutoSubmittable(serie *Serie) bool {
return serie.FilterAllChangesets(func(c *gerrit.Changeset) bool {
return c.HasTag(s.SubmitQueueTag) && c.IsVerified && c.IsCodeReviewed
})
}
// GetChangesetURL returns the URL to view a given changeset
func (s *SubmitQueue) GetChangesetURL(changeset *gerrit.Changeset) string {
return fmt.Sprintf("%s/c/%s/+/%d", s.gerrit.GetBaseURL(), s.ProjectName, changeset.Number)
}
// DoSubmit submits changes that can be submitted,
// and updates `Series` to contain the remaining ones
// Also updates `HEAD`.
func (s *SubmitQueue) DoSubmit(log *logrus.Logger) error {
var remainingSeries []*Serie
// TODO: actually log more!
for _, serie := range s.Series {
serieParentCommitIDs, err := serie.GetParentCommitIDs()
if err != nil {
return err
}
// we can only submit series with a single parent commit (otherwise they're not rebased)
if len(serieParentCommitIDs) != 1 {
return fmt.Errorf("%s has more than one parent commit, skipping", serie.String())
}
// if serie is auto-submittable and rebased on top of current master…
if s.IsAutoSubmittable(serie) && serieParentCommitIDs[0] == s.HEAD {
// submit the last changeset of the series, which submits intermediate ones too
_, err := s.gerrit.SubmitChangeset(serie.ChangeSets[len(serie.ChangeSets)-1])
if err != nil {
// this might fail, for various reasons:
// - developers could have updated the changeset meanwhile, clearing +1/+2 bits
// - master might have advanced, so this changeset isn't rebased on top of master
// TODO: we currently bail out entirely, but should be fine on the
// next loop. We might later want to improve the logic to be a bit more
// smarter (like log and try with the next one)
return err
}
// advance head to the leaf of the current serie for the next iteration
newHead, err := serie.GetLeafCommitID()
if err != nil {
return err
}
s.HEAD = newHead
} else {
remainingSeries = append(remainingSeries, serie)
}
}
s.Series = remainingSeries
return nil
}
// DoRebase rebases the next auto-submittable series on top of current HEAD
// they are still ordered by series size
// After a DoRebase, consumers are supposed to fetch state again via LoadSeries,
// as things most likely have changed, and error handling during partially failed rebases
// is really tricky
func (s *SubmitQueue) DoRebase(log *logrus.Logger) error {
if s.HEAD == "" {
return fmt.Errorf("current HEAD is an empty string, bailing out")
}
for _, serie := range s.Series {
logger := log.WithFields(logrus.Fields{
"serie": serie,
})
if !s.IsAutoSubmittable(serie) {
logger.Debug("skipping non-auto-submittable series")
continue
}
logger.Infof("rebasing on top of %s", s.HEAD)
_, err := s.RebaseSerie(serie, s.HEAD)
if err != nil {
// We skip trivial rebase errors instead of bailing out.
// TODO: we might want to remove s.SubmitQueueTag from the changeset,
// but even without doing it,
// we're merly spanning, and won't get stuck in trying to rebase the same
// changeset over and over again, as some other changeset will likely succeed
// with rebasing and will be merged by DoSubmit.
logger.Warnf("failure while rebasing, continuing with next one: %s", err)
continue
} else {
logger.Info("success rebasing on top of %s", s.HEAD)
break
}
}
return nil
}
// Run starts the submit and rebase logic.
func (s *SubmitQueue) Run(fetchOnly bool) *Result {
r := MakeResult()
//TODO: log decisions made and add to some ring buffer
var err error
log := logrus.New()
log.AddHook(r)
commitID, err := s.gerrit.GetHEAD(s.ProjectName, s.BranchName)
if err != nil {
log.Errorf("Unable to retrieve HEAD of branch %s at project %s: %s", s.BranchName, s.ProjectName, err)
r.Error = err
return r
}
s.HEAD = commitID
r.HEAD = commitID
err = s.LoadSeries(log)
if err != nil {
r.Error = err
return r
}
// copy series to result object
for _, serie := range s.Series {
r.Series = append(r.Series, *serie)
}
if len(s.Series) == 0 {
// Nothing to do!
log.Warn("Nothing to do here")
return r
}
if fetchOnly {
return r
}
err = s.DoSubmit(log)
if err != nil {
r.Error = err
return r
}
err = s.DoRebase(log)
if err != nil {
r.Error = err
return r
}
return r
}
// RebaseSerie rebases a whole serie on top of a given ref
// TODO: only rebase a single changeset. we don't really want to join disconnected series, by rebasing them on top of each other.
func (s *SubmitQueue) RebaseSerie(serie *Serie, ref string) (*Serie, error) {
newSeries := &Serie{
ChangeSets: make([]*gerrit.Changeset, len(serie.ChangeSets)),
}
rebaseOnto := ref
for _, changeset := range serie.ChangeSets {
newChangeset, err := s.gerrit.RebaseChangeset(changeset, rebaseOnto)
if err != nil {
// uh-oh…
// TODO: think about error handling
// TODO: remove the submit queue tag if the rebase fails (but only then, not on other errors)
return newSeries, err
}
newSeries.ChangeSets = append(newSeries.ChangeSets, newChangeset)
// the next changeset should be rebased on top of the current commit
rebaseOnto = newChangeset.CommitID
}
return newSeries, nil
}