Skip to content

Instantly share code, notes, and snippets.

@msteffen
Last active May 4, 2019 14:46
Show Gist options
  • Select an option

  • Save msteffen/b47ac8bf7a68ab677a10354dd3663d74 to your computer and use it in GitHub Desktop.

Select an option

Save msteffen/b47ac8bf7a68ab677a10354dd3663d74 to your computer and use it in GitHub Desktop.
PPS old diff vs new
| // The master process is responsible for creating/deleting workers as
| // pipelines are created/removed.
// step takes 'ptr', a newly-changed pipeline pointer in etcd, and | func (a *apiServer) master() {
// 1. retrieves its full pipeline spec and RC | masterLock := dlock.NewDLock(a.etcdClient, path.Join(a.etcdPrefix, masterLockPath))
// 2. makes whatever changes are needed to bring the RC in line with the (new) spec | backoff.RetryNotify(func() error {
// 3. updates 'ptr', if needed, to reflect the action it just took | ...
func (a *apiServer) step(pachClient *client.APIClient, pipelineName string, keyVer, keyRev int64) error { | // Watch for new pipeline creates/updates
// Handle tracing (pipelineRestarted is used to maybe delete trace) | var (
log.Infof("PPS master: processing event for %q", pipelineName) | span opentracing.Span
| oldCtx = ctx
// Retrieve pipelineInfo from the spec repo | oldPachClient = pachClient
op, err := a.newPipelineOp(pachClient, pipelineName) | )
if err != nil { | defer func() {
return op.setPipelineFailure(fmt.Sprintf("couldn't initialize pipeline op: %v", err)) | // Finish any dangling span
} | // Note: must wrap 'tracing.FinishAnySpan(span)' in closure so that
span, ctx := extended.AddPipelineSpanToAnyTrace(pachClient.Ctx(), | // 'span' is dereferenced after the "for" loop below runs (it's nil now)
a.etcdClient, pipelineName, "/pps.Master/ProcessPipelineUpdate", | tracing.FinishAnySpan(span) // finish any dangling span
"key-version", keyVer, | }()
"mod-revision", keyRev, | for {
"state", op.ptr.State.String(), | // finish span from previous pipeline operation
"spec-commit", op.ptr.SpecCommit, | tracing.FinishAnySpan(span)
) | span = nil
defer tracing.FinishAnySpan(span) |
if span != nil { | select {
pachClient = pachClient.WithCtx(ctx) | case event := <-pipelineWatcher.Watch():
} | if event.Err != nil {
| return fmt.Errorf("event err: %+v", event.Err)
// Take whatever actions are needed | }
switch op.ptr.State { | switch event.Type {
case pps.PipelineState_PIPELINE_STARTING, pps.PipelineState_PIPELINE_RESTARTING: | case watch.EventPut:
if op.rc != nil && !op.rcIsFresh() { | var pipelineName string
// old RC is not down yet | var prevPipelinePtr pps.EtcdPipelineInfo
log.Errorf("PPS master: restarting %q as it has an out-of-date RC", op.name) | var pipelinePtr pps.EtcdPipelineInfo
return op.restartPipeline() | if err := event.Unmarshal(&pipelineName, &pipelinePtr); err != nil {
} else if op.rc == nil { | return err
// default: old RC (if any) is down but new RC is not up yet | }
if err := op.createPipelineResources(); err != nil { | if event.PrevKey != nil {
return err | if err := event.UnmarshalPrev(&pipelineName, &prevPipelinePtr); err != nil {
} | return err
} | }
// trigger another event--once pipeline is RUNNING, step() will scale it up | }
if op.pipelineInfo.Stopped { | log.Infof("PPS master: pipeline %q: %s -> %s", pipelineName, prevPipelinePtr.State, pipelinePtr.State)
if err := op.setPipelineState(pps.PipelineState_PIPELINE_PAUSED); err != nil { | var prevSpecCommit string
return err | if prevPipelinePtr.SpecCommit != nil {
} | prevSpecCommit = prevPipelinePtr.SpecCommit.ID
} else { | }
if err := op.setPipelineState(pps.PipelineState_PIPELINE_RUNNING); err != nil { | var curSpecCommit = pipelinePtr.SpecCommit.ID
return err |
} | // Handle tracing (pipelineRestarted is used to maybe delete trace)
} | if span, ctx = extended.AddPipelineSpanToAnyTrace(oldCtx,
case pps.PipelineState_PIPELINE_RUNNING: | a.etcdClient, pipelineName, "/pps.Master/ProcessPipelineUpdate",
if !op.rcIsFresh() { | "key-version", event.Ver,
return op.restartPipeline() | "mod-revision", event.Rev,
} | "prev-key", string(event.PrevKey),
op.startPipelineMonitor() | "old-state", prevPipelinePtr.State.String(),
| "old-spec-commit", prevSpecCommit,
if op.pipelineInfo.Stopped { | "new-state", pipelinePtr.State.String(),
// StopPipeline has been called, but pipeline hasn't been paused yet | "new-spec-commit", curSpecCommit,
if err := op.scaleDownPipeline(); err != nil { | ); span != nil {
return err | pachClient = oldPachClient.WithCtx(ctx)
} | } else {
return op.setPipelineState(pps.PipelineState_PIPELINE_PAUSED) | pachClient = oldPachClient
} | }
// default: scale up if pipeline start hasn't propagated to etcd yet |
// Note: mostly this should do nothing, as this runs several times per job | // Retrieve pipelineInfo (and prev pipeline's pipelineInfo) from the
if err := op.scaleUpPipeline(); err != nil { | // spec repo
return err | var pipelineInfo, prevPipelineInfo *pps.PipelineInfo
} | if err := a.sudo(pachClient, func(superUserClient *client.APIClient) error {
case pps.PipelineState_PIPELINE_STANDBY, pps.PipelineState_PIPELINE_PAUSED: | var err error
if !op.rcIsFresh() { | pipelineInfo, err = ppsutil.GetPipelineInfo(superUserClient, &pipelinePtr)
log.Errorf("PPS master: restarting %q as it has no RC", op.name) | if err != nil {
if err := op.restartPipeline(); err != nil { | return err
return err | }
} |
return nil | if prevPipelinePtr.SpecCommit != nil {
} | prevPipelineInfo, err = ppsutil.GetPipelineInfo(superUserClient, &prevPipelinePtr)
op.startPipelineMonitor() | if err != nil {
| return err
if op.ptr.State == pps.PipelineState_PIPELINE_PAUSED && !op.pipelineInfo.Stopped { | }
// StartPipeline has been called, but pipeline hasn't been started yet | }
if err := op.scaleUpPipeline(); err != nil { | return nil
return err | }); err != nil {
} | return fmt.Errorf("watch event had no pipelineInfo: %v", err)
return op.setPipelineState(pps.PipelineState_PIPELINE_RUNNING) | }
} |
// default: scale down if pause/standby hasn't propagated to etcd yet | // True if the pipeline has a git input
if err := op.scaleDownPipeline(); err != nil { | var hasGitInput bool
return err | pps.VisitInput(pipelineInfo.Input, func(input *pps.Input) {
} | if input.Git != nil {
case pps.PipelineState_PIPELINE_FAILURE: | hasGitInput = true
// pipeline fails if docker image isn't found | }
if err := op.finishPipelineOutputCommits(); err != nil { | })
return err | // True if the user has called StopPipeline, but the PPS master hasn't
} | // processed the update yet (PPS master sets the pipeline state to
return op.deletePipelineResources() | // PAUSED)
} | pipelinePartiallyPaused := pipelineInfo.Stopped &&
return nil | pipelineInfo.State != pps.PipelineState_PIPELINE_PAUSED
} | // True if the pipeline has been restarted (regardless of any change
| // to the pipeline spec)
| pipelineRestarted := !pipelineInfo.Stopped &&
| event.PrevKey != nil && prevPipelineInfo.Stopped
| // True if auth has been activated or deactivated
| authActivationChanged := (pipelinePtr.AuthToken == "") !=
| (prevPipelinePtr.AuthToken == "")
| // True if the pipeline has been created or updated
| pipelineNewSpecCommit := func() bool {
| var prevSpecCommit string
| if prevPipelinePtr.SpecCommit != nil {
| prevSpecCommit = prevPipelinePtr.SpecCommit.ID
| }
| return pipelinePtr.SpecCommit.ID != prevSpecCommit &&
| !pipelineInfo.Stopped
| }()
|
| // Handle cases where pipeline isn't running anymore
| if pipelineInfo.State == pps.PipelineState_PIPELINE_FAILURE {
| // pipeline fails if docker image isn't found OR if pipeline RC is
| // missing
| if err := a.finishPipelineOutputCommits(pachClient, pipelineInfo); err != nil {
| return err
| }
| if err := a.deletePipelineResources(ctx, pipelineName); err != nil {
| return err
| }
| continue
| } else if pipelineInfo.State == pps.PipelineState_PIPELINE_PAUSED {
| continue // pipeline has already been paused -- nothing to do
| } else if pipelinePartiallyPaused {
| // Clusters can get into broken state where pipeline exists but
| // RC is deleted by user--causes master() to crashloop trying to
| // scale up/down workers repeatedly. Break the cycle here
| if err := a.scaleDownWorkersForPipeline(ctx, pipelineInfo); err != nil {
| if failErr := a.setPipelineFailure(ctx, pipelineInfo.Pipeline.Name, err.Error()); failErr != nil {
| return failErr
| }
| return err
| }
| // Mark the pipeline paused, thus fully stopping it (Note: this will
| // generate another etcd event, which is ignored below)
| if err := a.setPipelineState(pachClient, pipelineInfo, pps.PipelineState_PIPELINE_PAUSED, ""); err != nil {
| return err
| }
| continue
| }
|
| // Handle cases where pipeline is still running
| if pipelineRestarted || authActivationChanged || pipelineNewSpecCommit {
| if (pipelineNewSpecCommit || authActivationChanged) && event.PrevKey != nil {
| if err := a.deletePipelineResources(ctx, prevPipelineInfo.Pipeline.Name); err != nil {
| return err
| }
| }
| if (pipelineNewSpecCommit || pipelineRestarted) && hasGitInput {
| if err := a.checkOrDeployGithookService(ctx); err != nil {
| return err
| }
| }
| if err := a.upsertWorkersForPipeline(ctx, pipelineInfo); err != nil {
| log.Errorf("error upserting workers for new/restarted pipeline %q: %v", pipelineName, err)
| if err := a.setPipelineState(pachClient, pipelineInfo, pps.PipelineState_PIPELINE_STARTING, fmt.Sprintf("failed to create workers: %s", err.Error())); err != nil {
| return err
| }
| // We return the error here, this causes us to go
| // into backoff and try again from scratch. This
| // means that we'll try creating this pipeline
| // again and also gives a chance for another node,
| // which might actually be able to talk to k8s, to
| // get a chance at creating the workers.
| return err
| }
| }
| if pipelineInfo.State == pps.PipelineState_PIPELINE_RUNNING {
| if err := a.scaleUpWorkersForPipeline(ctx, pipelineInfo); err != nil {
| if isNotFoundErr(err) {
| // See note above (under "if pipelinePartiallyStopped")
| if failErr := a.setPipelineFailure(ctx, pipelineInfo.Pipeline.Name, err.Error()); failErr != nil {
| return failErr
| }
| }
| return err
| }
| }
| if pipelineInfo.State == pps.PipelineState_PIPELINE_STANDBY {
| if err := a.scaleDownWorkersForPipeline(ctx, pipelineInfo); err != nil {
| // See note above (under "if pipelinePartiallyStopped")
| if failErr := a.setPipelineFailure(ctx, pipelineInfo.Pipeline.Name, err.Error()); failErr != nil {
| return failErr
| }
| return err
| }
| }
| }
| case event := <-watchChan:
| ...
| }
| }
| }, backoff.NewInfiniteBackOff(), func(err error, d time.Duration) error {
| ...
| })
| panic("internal error: PPS master has somehow exited. Restarting pod...")
| }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment