Last active
May 4, 2019 14:46
-
-
Save msteffen/b47ac8bf7a68ab677a10354dd3663d74 to your computer and use it in GitHub Desktop.
PPS old diff vs new
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| | // The master process is responsible for creating/deleting workers as | |
| | // pipelines are created/removed. | |
| // step takes 'ptr', a newly-changed pipeline pointer in etcd, and | func (a *apiServer) master() { | |
| // 1. retrieves its full pipeline spec and RC | masterLock := dlock.NewDLock(a.etcdClient, path.Join(a.etcdPrefix, masterLockPath)) | |
| // 2. makes whatever changes are needed to bring the RC in line with the (new) spec | backoff.RetryNotify(func() error { | |
| // 3. updates 'ptr', if needed, to reflect the action it just took | ... | |
| func (a *apiServer) step(pachClient *client.APIClient, pipelineName string, keyVer, keyRev int64) error { | // Watch for new pipeline creates/updates | |
| // Handle tracing (pipelineRestarted is used to maybe delete trace) | var ( | |
| log.Infof("PPS master: processing event for %q", pipelineName) | span opentracing.Span | |
| | oldCtx = ctx | |
| // Retrieve pipelineInfo from the spec repo | oldPachClient = pachClient | |
| op, err := a.newPipelineOp(pachClient, pipelineName) | ) | |
| if err != nil { | defer func() { | |
| return op.setPipelineFailure(fmt.Sprintf("couldn't initialize pipeline op: %v", err)) | // Finish any dangling span | |
| } | // Note: must wrap 'tracing.FinishAnySpan(span)' in closure so that | |
| span, ctx := extended.AddPipelineSpanToAnyTrace(pachClient.Ctx(), | // 'span' is dereferenced after the "for" loop below runs (it's nil now) | |
| a.etcdClient, pipelineName, "/pps.Master/ProcessPipelineUpdate", | tracing.FinishAnySpan(span) // finish any dangling span | |
| "key-version", keyVer, | }() | |
| "mod-revision", keyRev, | for { | |
| "state", op.ptr.State.String(), | // finish span from previous pipeline operation | |
| "spec-commit", op.ptr.SpecCommit, | tracing.FinishAnySpan(span) | |
| ) | span = nil | |
| defer tracing.FinishAnySpan(span) | | |
| if span != nil { | select { | |
| pachClient = pachClient.WithCtx(ctx) | case event := <-pipelineWatcher.Watch(): | |
| } | if event.Err != nil { | |
| | return fmt.Errorf("event err: %+v", event.Err) | |
| // Take whatever actions are needed | } | |
| switch op.ptr.State { | switch event.Type { | |
| case pps.PipelineState_PIPELINE_STARTING, pps.PipelineState_PIPELINE_RESTARTING: | case watch.EventPut: | |
| if op.rc != nil && !op.rcIsFresh() { | var pipelineName string | |
| // old RC is not down yet | var prevPipelinePtr pps.EtcdPipelineInfo | |
| log.Errorf("PPS master: restarting %q as it has an out-of-date RC", op.name) | var pipelinePtr pps.EtcdPipelineInfo | |
| return op.restartPipeline() | if err := event.Unmarshal(&pipelineName, &pipelinePtr); err != nil { | |
| } else if op.rc == nil { | return err | |
| // default: old RC (if any) is down but new RC is not up yet | } | |
| if err := op.createPipelineResources(); err != nil { | if event.PrevKey != nil { | |
| return err | if err := event.UnmarshalPrev(&pipelineName, &prevPipelinePtr); err != nil { | |
| } | return err | |
| } | } | |
| // trigger another event--once pipeline is RUNNING, step() will scale it up | } | |
| if op.pipelineInfo.Stopped { | log.Infof("PPS master: pipeline %q: %s -> %s", pipelineName, prevPipelinePtr.State, pipelinePtr.State) | |
| if err := op.setPipelineState(pps.PipelineState_PIPELINE_PAUSED); err != nil { | var prevSpecCommit string | |
| return err | if prevPipelinePtr.SpecCommit != nil { | |
| } | prevSpecCommit = prevPipelinePtr.SpecCommit.ID | |
| } else { | } | |
| if err := op.setPipelineState(pps.PipelineState_PIPELINE_RUNNING); err != nil { | var curSpecCommit = pipelinePtr.SpecCommit.ID | |
| return err | | |
| } | // Handle tracing (pipelineRestarted is used to maybe delete trace) | |
| } | if span, ctx = extended.AddPipelineSpanToAnyTrace(oldCtx, | |
| case pps.PipelineState_PIPELINE_RUNNING: | a.etcdClient, pipelineName, "/pps.Master/ProcessPipelineUpdate", | |
| if !op.rcIsFresh() { | "key-version", event.Ver, | |
| return op.restartPipeline() | "mod-revision", event.Rev, | |
| } | "prev-key", string(event.PrevKey), | |
| op.startPipelineMonitor() | "old-state", prevPipelinePtr.State.String(), | |
| | "old-spec-commit", prevSpecCommit, | |
| if op.pipelineInfo.Stopped { | "new-state", pipelinePtr.State.String(), | |
| // StopPipeline has been called, but pipeline hasn't been paused yet | "new-spec-commit", curSpecCommit, | |
| if err := op.scaleDownPipeline(); err != nil { | ); span != nil { | |
| return err | pachClient = oldPachClient.WithCtx(ctx) | |
| } | } else { | |
| return op.setPipelineState(pps.PipelineState_PIPELINE_PAUSED) | pachClient = oldPachClient | |
| } | } | |
| // default: scale up if pipeline start hasn't propagated to etcd yet | | |
| // Note: mostly this should do nothing, as this runs several times per job | // Retrieve pipelineInfo (and prev pipeline's pipelineInfo) from the | |
| if err := op.scaleUpPipeline(); err != nil { | // spec repo | |
| return err | var pipelineInfo, prevPipelineInfo *pps.PipelineInfo | |
| } | if err := a.sudo(pachClient, func(superUserClient *client.APIClient) error { | |
| case pps.PipelineState_PIPELINE_STANDBY, pps.PipelineState_PIPELINE_PAUSED: | var err error | |
| if !op.rcIsFresh() { | pipelineInfo, err = ppsutil.GetPipelineInfo(superUserClient, &pipelinePtr) | |
| log.Errorf("PPS master: restarting %q as it has no RC", op.name) | if err != nil { | |
| if err := op.restartPipeline(); err != nil { | return err | |
| return err | } | |
| } | | |
| return nil | if prevPipelinePtr.SpecCommit != nil { | |
| } | prevPipelineInfo, err = ppsutil.GetPipelineInfo(superUserClient, &prevPipelinePtr) | |
| op.startPipelineMonitor() | if err != nil { | |
| | return err | |
| if op.ptr.State == pps.PipelineState_PIPELINE_PAUSED && !op.pipelineInfo.Stopped { | } | |
| // StartPipeline has been called, but pipeline hasn't been started yet | } | |
| if err := op.scaleUpPipeline(); err != nil { | return nil | |
| return err | }); err != nil { | |
| } | return fmt.Errorf("watch event had no pipelineInfo: %v", err) | |
| return op.setPipelineState(pps.PipelineState_PIPELINE_RUNNING) | } | |
| } | | |
| // default: scale down if pause/standby hasn't propagated to etcd yet | // True if the pipeline has a git input | |
| if err := op.scaleDownPipeline(); err != nil { | var hasGitInput bool | |
| return err | pps.VisitInput(pipelineInfo.Input, func(input *pps.Input) { | |
| } | if input.Git != nil { | |
| case pps.PipelineState_PIPELINE_FAILURE: | hasGitInput = true | |
| // pipeline fails if docker image isn't found | } | |
| if err := op.finishPipelineOutputCommits(); err != nil { | }) | |
| return err | // True if the user has called StopPipeline, but the PPS master hasn't | |
| } | // processed the update yet (PPS master sets the pipeline state to | |
| return op.deletePipelineResources() | // PAUSED) | |
| } | pipelinePartiallyPaused := pipelineInfo.Stopped && | |
| return nil | pipelineInfo.State != pps.PipelineState_PIPELINE_PAUSED | |
| } | // True if the pipeline has been restarted (regardless of any change | |
| | // to the pipeline spec) | |
| | pipelineRestarted := !pipelineInfo.Stopped && | |
| | event.PrevKey != nil && prevPipelineInfo.Stopped | |
| | // True if auth has been activated or deactivated | |
| | authActivationChanged := (pipelinePtr.AuthToken == "") != | |
| | (prevPipelinePtr.AuthToken == "") | |
| | // True if the pipeline has been created or updated | |
| | pipelineNewSpecCommit := func() bool { | |
| | var prevSpecCommit string | |
| | if prevPipelinePtr.SpecCommit != nil { | |
| | prevSpecCommit = prevPipelinePtr.SpecCommit.ID | |
| | } | |
| | return pipelinePtr.SpecCommit.ID != prevSpecCommit && | |
| | !pipelineInfo.Stopped | |
| | }() | |
| | | |
| | // Handle cases where pipeline isn't running anymore | |
| | if pipelineInfo.State == pps.PipelineState_PIPELINE_FAILURE { | |
| | // pipeline fails if docker image isn't found OR if pipeline RC is | |
| | // missing | |
| | if err := a.finishPipelineOutputCommits(pachClient, pipelineInfo); err != nil { | |
| | return err | |
| | } | |
| | if err := a.deletePipelineResources(ctx, pipelineName); err != nil { | |
| | return err | |
| | } | |
| | continue | |
| | } else if pipelineInfo.State == pps.PipelineState_PIPELINE_PAUSED { | |
| | continue // pipeline has already been paused -- nothing to do | |
| | } else if pipelinePartiallyPaused { | |
| | // Clusters can get into broken state where pipeline exists but | |
| | // RC is deleted by user--causes master() to crashloop trying to | |
| | // scale up/down workers repeatedly. Break the cycle here | |
| | if err := a.scaleDownWorkersForPipeline(ctx, pipelineInfo); err != nil { | |
| | if failErr := a.setPipelineFailure(ctx, pipelineInfo.Pipeline.Name, err.Error()); failErr != nil { | |
| | return failErr | |
| | } | |
| | return err | |
| | } | |
| | // Mark the pipeline paused, thus fully stopping it (Note: this will | |
| | // generate another etcd event, which is ignored below) | |
| | if err := a.setPipelineState(pachClient, pipelineInfo, pps.PipelineState_PIPELINE_PAUSED, ""); err != nil { | |
| | return err | |
| | } | |
| | continue | |
| | } | |
| | | |
| | // Handle cases where pipeline is still running | |
| | if pipelineRestarted || authActivationChanged || pipelineNewSpecCommit { | |
| | if (pipelineNewSpecCommit || authActivationChanged) && event.PrevKey != nil { | |
| | if err := a.deletePipelineResources(ctx, prevPipelineInfo.Pipeline.Name); err != nil { | |
| | return err | |
| | } | |
| | } | |
| | if (pipelineNewSpecCommit || pipelineRestarted) && hasGitInput { | |
| | if err := a.checkOrDeployGithookService(ctx); err != nil { | |
| | return err | |
| | } | |
| | } | |
| | if err := a.upsertWorkersForPipeline(ctx, pipelineInfo); err != nil { | |
| | log.Errorf("error upserting workers for new/restarted pipeline %q: %v", pipelineName, err) | |
| | if err := a.setPipelineState(pachClient, pipelineInfo, pps.PipelineState_PIPELINE_STARTING, fmt.Sprintf("failed to create workers: %s", err.Error())); err != nil { | |
| | return err | |
| | } | |
| | // We return the error here, this causes us to go | |
| | // into backoff and try again from scratch. This | |
| | // means that we'll try creating this pipeline | |
| | // again and also gives a chance for another node, | |
| | // which might actually be able to talk to k8s, to | |
| | // get a chance at creating the workers. | |
| | return err | |
| | } | |
| | } | |
| | if pipelineInfo.State == pps.PipelineState_PIPELINE_RUNNING { | |
| | if err := a.scaleUpWorkersForPipeline(ctx, pipelineInfo); err != nil { | |
| | if isNotFoundErr(err) { | |
| | // See note above (under "if pipelinePartiallyStopped") | |
| | if failErr := a.setPipelineFailure(ctx, pipelineInfo.Pipeline.Name, err.Error()); failErr != nil { | |
| | return failErr | |
| | } | |
| | } | |
| | return err | |
| | } | |
| | } | |
| | if pipelineInfo.State == pps.PipelineState_PIPELINE_STANDBY { | |
| | if err := a.scaleDownWorkersForPipeline(ctx, pipelineInfo); err != nil { | |
| | // See note above (under "if pipelinePartiallyStopped") | |
| | if failErr := a.setPipelineFailure(ctx, pipelineInfo.Pipeline.Name, err.Error()); failErr != nil { | |
| | return failErr | |
| | } | |
| | return err | |
| | } | |
| | } | |
| | } | |
| | case event := <-watchChan: | |
| | ... | |
| | } | |
| | } | |
| | }, backoff.NewInfiniteBackOff(), func(err error, d time.Duration) error { | |
| | ... | |
| | }) | |
| | panic("internal error: PPS master has somehow exited. Restarting pod...") | |
| | } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment