-
-
Save jermdavis/4dcf6568d5cd44f03d0ec503620ac177 to your computer and use it in GitHub Desktop.
| using System; | |
| namespace StronglyTypedPipelines | |
| { | |
| /// <summary> | |
| /// A base interface required so that reflection code can create a Step from its type name, | |
| /// without needing to understand its generic parameters first. | |
| /// </summary> | |
| public interface IPipelineStep | |
| { | |
| } | |
| /// <summary> | |
| /// Base type for individual pipeline steps. | |
| /// Descendants of this type map an input value to an output value. | |
| /// The input and output types can differ. | |
| /// </summary> | |
| public interface IPipelineStep<INPUT, OUTPUT> : IPipelineStep | |
| { | |
| OUTPUT Process(INPUT input); | |
| } | |
| /// <summary> | |
| /// An extension method for combining PipelineSteps together into a data flow. | |
| /// </summary> | |
| public static class PipelineStepExtensions | |
| { | |
| public static OUTPUT Step<INPUT, OUTPUT>(this INPUT input, IPipelineStep<INPUT, OUTPUT> step) | |
| { | |
| return step.Process(input); | |
| } | |
| } | |
| /// <summary> | |
| /// The base type for a complete pipeline. | |
| /// Descendant types can use their constructor to compile a set of PipelineSteps together | |
| /// the PipelineStepExtensions.Step() method, and assign this to the PipelineSteps property here. | |
| /// The initial and final types of the set of steps must match the input and output types of this class, | |
| /// but the intermediate types can vary. | |
| /// </summary> | |
| public abstract class Pipeline<INPUT, OUTPUT> : IPipelineStep<INPUT, OUTPUT> | |
| { | |
| public Func<INPUT, OUTPUT> PipelineSteps { get; protected set; } | |
| public OUTPUT Process(INPUT input) | |
| { | |
| return PipelineSteps(input); | |
| } | |
| } | |
| } |
| namespace StronglyTypedPipelines | |
| { | |
| public class BasicPipeline : Pipeline<int, float> | |
| { | |
| public BasicPipeline() | |
| { | |
| PipelineSteps = input => input | |
| .Step(new DoubleStep()) | |
| .Step(new ToStringStep()) | |
| .Step(new DuplicateStep()) | |
| .Step(new ToFloatStep()); | |
| } | |
| } | |
| public class InnerPipeline : Pipeline<int, int> | |
| { | |
| public InnerPipeline() | |
| { | |
| PipelineSteps = input => input | |
| .Step(new ThirdStep()) | |
| .Step(new RoundStep()); | |
| } | |
| } | |
| public class NestedPipeline : Pipeline<int, string> | |
| { | |
| public NestedPipeline() | |
| { | |
| PipelineSteps = input => input | |
| .Step(new DoubleStep()) | |
| .Step(new InnerPipeline()) | |
| .Step(new ToStringStep()); | |
| } | |
| } | |
| public class BranchingPipeline : Pipeline<int, string> | |
| { | |
| public BranchingPipeline() | |
| { | |
| PipelineSteps = input => input | |
| .Step(new OptionalStep<int, int>( | |
| f => f > 50, | |
| new InnerPipeline() | |
| )) | |
| .Step(new ChoiceStep<int, int>( | |
| f => f > 100, | |
| new HalfStep(), | |
| new DoubleStep() | |
| )) | |
| .Step(new ToStringStep()); | |
| } | |
| } | |
| } |
| namespace StronglyTypedPipelines | |
| { | |
| public class DoubleStep : IPipelineStep<int, int> | |
| { | |
| public int Process(int input) | |
| { | |
| return input * 2; | |
| } | |
| } | |
| public class HalfStep : IPipelineStep<int,int> | |
| { | |
| public int Process(int input) | |
| { | |
| return input / 2; | |
| } | |
| } | |
| public class ThirdStep : IPipelineStep<int, float> | |
| { | |
| public float Process(int input) | |
| { | |
| return input / 3f; | |
| } | |
| } | |
| public class RoundStep : IPipelineStep<float, int> | |
| { | |
| public int Process(float input) | |
| { | |
| return (int)input; | |
| } | |
| } | |
| public class ToStringStep : IPipelineStep<int, string> | |
| { | |
| public string Process(int input) | |
| { | |
| return input.ToString(); | |
| } | |
| } | |
| public class DuplicateStep : IPipelineStep<string, string> | |
| { | |
| public string Process(string input) | |
| { | |
| return input + "." + input; | |
| } | |
| } | |
| public class ToFloatStep : IPipelineStep<string, float> | |
| { | |
| public float Process(string input) | |
| { | |
| return float.Parse(input); | |
| } | |
| } | |
| } |
| using System; | |
| using System.Collections.Generic; | |
| using System.Linq; | |
| using System.Reflection; | |
| using System.Xml.Linq; | |
| namespace StronglyTypedPipelines | |
| { | |
| public abstract class ConfigBasedPipeline<INPUT, OUTPUT> : Pipeline<INPUT, OUTPUT> | |
| { | |
| public ConfigBasedPipeline(XElement pipelineXml) | |
| { | |
| if (pipelineXml == null) | |
| { | |
| throw new ArgumentNullException(nameof(pipelineXml)); | |
| } | |
| var pipelineSteps = parsePipelineSteps(pipelineXml); | |
| validatePipelineSteps(pipelineSteps); | |
| PipelineSteps = input => processPipelineSteps(pipelineSteps, input); | |
| } | |
| private OUTPUT processPipelineSteps(IList<IPipelineStep> pipelineSteps, INPUT input) | |
| { | |
| object output = input; | |
| foreach (IPipelineStep step in pipelineSteps) | |
| { | |
| MethodInfo mi = step.GetType().GetMethod("Process", BindingFlags.Public | BindingFlags.Instance); | |
| output = mi.Invoke(step, new[] { output }); | |
| } | |
| return (OUTPUT)output; | |
| } | |
| private IList<IPipelineStep> parsePipelineSteps(XElement pipelineXml) | |
| { | |
| var pipeline = new List<IPipelineStep>(); | |
| foreach (var xStep in pipelineXml.Elements("step")) | |
| { | |
| string typeName = xStep.Attribute("type").Value; | |
| var type = Type.GetType(typeName); | |
| var ctr = type.GetConstructor(Type.EmptyTypes); | |
| var obj = (IPipelineStep)ctr.Invoke(Type.EmptyTypes); | |
| pipeline.Add(obj); | |
| } | |
| return pipeline; | |
| } | |
| private void validatePipelineSteps(IList<IPipelineStep> pipelineSteps) | |
| { | |
| int stepNumber = 0; | |
| Type pipelineBaseInterface = this.GetType().GetInterface("IPipelineStep`2"); | |
| Type currentInputType = pipelineBaseInterface.GenericTypeArguments[0]; | |
| Type outputType = pipelineBaseInterface.GenericTypeArguments[1]; | |
| foreach (var step in pipelineSteps) | |
| { | |
| stepNumber += 1; | |
| Type stepBaseInterface = step.GetType().GetInterface("IPipelineStep`2"); | |
| Type stepInType = stepBaseInterface.GenericTypeArguments[0]; | |
| Type stepOutType = stepBaseInterface.GenericTypeArguments[1]; | |
| if (currentInputType != stepInType) | |
| { | |
| string msg = "Step #{0} {1} input type {2} does not match current type {3}."; | |
| throw new InvalidOperationException(string.Format(msg, stepNumber, step.GetType().Name, stepInType.Name, currentInputType.Name)); | |
| } | |
| currentInputType = stepOutType; | |
| } | |
| if (currentInputType != outputType) | |
| { | |
| string msg = "Final step #{0} {1} output type {2} does not equal output of pipeline {3}."; | |
| throw new InvalidOperationException(string.Format(msg, stepNumber, pipelineSteps.Last().GetType().Name, currentInputType.Name, outputType.Name)); | |
| } | |
| } | |
| } | |
| public class ExampleConfigBasedPipeline : ConfigBasedPipeline<int, string> | |
| { | |
| public ExampleConfigBasedPipeline(XElement pipelineXml) : base(pipelineXml) | |
| { | |
| } | |
| } | |
| } |
| <?xml version="1.0" encoding="utf-8" ?> | |
| <pipeline name="example"> | |
| <step type="StronglyTypedPipelines.DoubleStep, StronglyTypedPipelines" /> | |
| <!-- Will error: <step type="StronglyTypedPipelines.ThirdStep, StronglyTypedPipelines"/> --> | |
| <step type="StronglyTypedPipelines.ToStringStep, StronglyTypedPipelines" /> | |
| <step type="StronglyTypedPipelines.DuplicateStep, StronglyTypedPipelines" /> | |
| <!-- Will error: <step type="StronglyTypedPipelines.ToFloatStep, StronglyTypedPipelines"/> --> | |
| </pipeline> |
| using System; | |
| namespace StronglyTypedPipelines | |
| { | |
| public class OptionalStep<INPUT, OUTPUT> : IPipelineStep<INPUT, OUTPUT> where INPUT : OUTPUT | |
| { | |
| private IPipelineStep<INPUT, OUTPUT> _step; | |
| private Func<INPUT, bool> _choice; | |
| public OptionalStep(Func<INPUT, bool> choice, IPipelineStep<INPUT, OUTPUT> step) | |
| { | |
| _choice = choice; | |
| _step = step; | |
| } | |
| public OUTPUT Process(INPUT input) | |
| { | |
| if (_choice(input)) | |
| { | |
| return _step.Process(input); | |
| } | |
| else | |
| { | |
| return input; | |
| } | |
| } | |
| } | |
| public class ChoiceStep<INPUT, OUTPUT> : IPipelineStep<INPUT, OUTPUT> where INPUT : OUTPUT | |
| { | |
| private IPipelineStep<INPUT, OUTPUT> _first; | |
| private IPipelineStep<INPUT, OUTPUT> _second; | |
| private Func<INPUT, bool> _choice; | |
| public ChoiceStep(Func<INPUT, bool> choice, IPipelineStep<INPUT, OUTPUT> first, IPipelineStep<INPUT, OUTPUT> second) | |
| { | |
| _choice = choice; | |
| _first = first; | |
| _second = second; | |
| } | |
| public OUTPUT Process(INPUT input) | |
| { | |
| if (_choice(input)) | |
| { | |
| return _first.Process(input); | |
| } | |
| else | |
| { | |
| return _second.Process(input); | |
| } | |
| } | |
| } | |
| } |
| using System; | |
| using System.Collections.Generic; | |
| using System.Linq.Expressions; | |
| using System.Xml.Linq; | |
| namespace StronglyTypedPipelines | |
| { | |
| class Program | |
| { | |
| private static void BasicPipelineTest() | |
| { | |
| Console.WriteLine("Basic Pipeline Test"); | |
| var input = 49; | |
| Console.WriteLine(string.Format("Input Value: {0} [{1}]", input, input.GetType().Name)); | |
| var pipeline = new BasicPipeline(); | |
| var output = pipeline.Process(input); | |
| Console.WriteLine(string.Format("Output Value: {0} [{1}]", output, output.GetType().Name)); | |
| Console.WriteLine(); | |
| } | |
| private static void NestedPipelineTest() | |
| { | |
| Console.WriteLine("Nested Pipeline Test"); | |
| var input = 103; | |
| Console.WriteLine(string.Format("Input Value: {0} [{1}]", input, input.GetType().Name)); | |
| var pipeline = new NestedPipeline(); | |
| var output = pipeline.Process(input); | |
| Console.WriteLine(string.Format("Output Value: {0} [{1}]", output, output.GetType().Name)); | |
| Console.WriteLine(); | |
| } | |
| private static void BranchingPipelineTest() | |
| { | |
| Console.WriteLine("Branching Pipeline Test"); | |
| foreach(int input in new int[] { 1, 10, 100, 1000 }) | |
| { | |
| Console.WriteLine(string.Format("Input Value: {0} [{1}]", input, input.GetType().Name)); | |
| var pipeline = new BranchingPipeline(); | |
| var output = pipeline.Process(input); | |
| Console.WriteLine(string.Format("Output Value: {0} [{1}]", output, output.GetType().Name)); | |
| } | |
| Console.WriteLine(); | |
| } | |
| private static void ModifiablePipelineTest() | |
| { | |
| Console.WriteLine("Configured Pipeline Test"); | |
| var input = 13; | |
| Console.WriteLine(string.Format("Input Value: {0} [{1}]", input, input.GetType().Name)); | |
| XDocument xd = XDocument.Load("ConfigBasedPipeline.xml"); | |
| // | |
| // Patching the configuration data would go here | |
| // | |
| XElement pipelineXml = xd.Document.Element("pipeline"); | |
| try | |
| { | |
| var pipeline = new ExampleConfigBasedPipeline(pipelineXml); | |
| var output = pipeline.Process(input); | |
| Console.WriteLine(string.Format("Output Value: {0} [{1}]", output, output.GetType().Name)); | |
| } | |
| catch(Exception ex) | |
| { | |
| Console.WriteLine("ERROR: " + ex.Message); | |
| } | |
| Console.WriteLine(); | |
| } | |
| static void Main(string[] args) | |
| { | |
| BasicPipelineTest(); | |
| NestedPipelineTest(); | |
| BranchingPipelineTest(); | |
| ModifiablePipelineTest(); | |
| } | |
| } | |
| } |
Hi Jeremy!
Sorry for the delay... I've not been feeling well the last few days (not Covid, though)
Yeah, you nailed it: it was not the order, but the async behaviour instead (wow, I'm really confused by all of this, lol...)
As I said, I'm new to C#/.NET so thanks again for your patience.
I did include your latest suggestion for AsyncPipelineStepExtensions and now works beautiful!
I think you really should include this Async approach to your gists... It helped me a lot, and can be of use for other devs too!
Amazing job, I really appreciate it! =D
gonna keep testing new scenarios, but I think most of the infrastructure is now solid enough to go on.
Thanks!!!! =)
Yeah totally agree the behaviour of async can be confusing. There are good books about this stuff though, if you want to learn a bit more - "C# In Depth" by Jon Skeet covers useful detail about the hows & whys of async. And it also covers lots of other helpful stuff for people new to the C# language overall. Maybe work will let you put a copy on expenses for learning?
Glad you're getting on top of all this though - happy have been able to help. Certainly not going to waste all this interesting material - what I've worked out will end up on my blog and I'll add new gists for the changed code as well.
Actually - having thought about this harder, I think there's a better solution that makes all this simpler. We can move all the logic for handling a
Task<TIn>into the extension method class, and simplify everything else...Instead of the second batch of code from the previous comment...
and the individual example steps end up looking like:
That gives the same overall behaviour, but doesn't require an abstract base for pipeline steps, and doesn't suffer from the "putting code before the await for the input" issue I mentioned before.