lundi 24 septembre 2018

CSharp - Weighted Semaphore - Does It Exist?

TL;DR - Is there a Semaphore-like class that I can await against while specifying that a given task will be more resource intensive, as my current limiting factor is memory usage in a data audit between two different data sources (ElasticSearch and SQL Server)?

I'm writing a rudimentary test runner application in which I define test classes with a single static method that instantiates an instance of itself based on some generic configuration information that is specific to that test. I opted to build my own test runner rather than use xUnit or nUnit a long time ago because the parallelism possible with those didn't suit my needs, but the design is similar in that I mark the static method with a Fact attribute and my test runner finds and executes it via Activator.CreateInstance.

Note: All class names with "Wrap" are a static instance following the Singleton pattern wrapping a specific shared resource, like Logging or Configuration, so that I can add properties for use in testing.

public static int Main(string[] args)
    {
        var testMethods = new List<ClassMethodInstance>();
        var returnCode = 0;
        Initialize();

        var test = typeof(FactAttribute);

        PopulateByAttribute(testMethods, test);

        var timer = Stopwatch.StartNew();
        var startDt = DateTime.Now;
        //Parallel.ForEach(GetSelectedTests(testMethods), RunClassMethodInstance); 
        GetSelectedTests(testMethods).AsyncAwaitAll(RunClassMethodInstance);

        Logger.Info($"Elapsed time: {timer.Elapsed}");

        try
        {
            Assert.Empty(FailedTests);
        }
        catch (Exception e)
        {
            Logger.Error(e.Message, e);
            returnCode = -1;
        }

        LogWrap.FlushQueue();
        LogWrap.SendEmail(startDt);

        return returnCode;
    }

A test method would look like this:

    [Fact]
    public static void ValidateElasticSearchState()
    {
        var options = new ParallelOptions{MaxDegreeOfParallelism = 4};

        var tests = from store in ConfigWrap.TestStores 
            from type in ValidTypes 
            where ConfigWrap.ElasticDocumentTestTypes.Contains(type)
            orderby store, type
            select new TestCase(type, store);

        Parallel.ForEach(tests, options, Process);
        //tests.AsyncAwaitAll(Process, TaskCreationOptions.LongRunning);
    }

Some background info: ClassMethodInstance is just a class I created to simplify the amount of code to properly utilize Activator.CreateInstance with static, instance and async methods, PopulateByAttribute() is a method that reads the attributes of every method in the same assembly and populates the provided collection with only those that have the specified attribute, and then GetSelectedTests() filters that set by only those methods that have a corresponding pattern in the Config file flagged as True.

My question, then, applies to my approach to parallelism. Originally, the use of static methods was to facilitate my use of Parallel.ForEach for running all tests simultaneously as much as my computer would allow. With my learning leading me to using async-await patterns instead of Parallel.ForEach for a better utilization of threads and operations, I created a generic extension method that would use Task.Factory.StartNew to run my code via the async paradigm in C#.

To run the operation without need of the result:

public static void AsyncAwaitAll<T>(this IEnumerable<T> enumerable, Action<T> action, TaskCreationOptions options = TaskCreationOptions.None)
    {
        var tasks = enumerable
            .Select(item => Task.Factory.StartNew(() => action(item), options))
            .ToArray();

        Task.WaitAll(tasks);
    }

And if I need the result:

public static T GetAwaitableResult<T>(this Task<T> task) =>
        task.GetAwaiter().GetResult();

public static TOutput[] AsyncReturnAll<TInput, TOutput>(this IEnumerable<TInput> enumerable, Func<TInput, TOutput> method, TaskCreationOptions options = TaskCreationOptions.None)
    {
        var tasks = enumerable
            .Select(item => Task.Factory.StartNew(() => method(item), options))
            .ToArray();

        return Task.WhenAll(tasks).GetAwaitableResult();
    }

My problem is that some operations I know are going to be heavy on memory usage, and others will be fairly light-weight. I want to have a Semaphore-like class that I can say an operation will take 10 process units and another operation will only take 2 process units. I've thought of using a simple input of int for weights and then iterating on a For-Loop awaiting against the static Semaphore instance, but I then run the risk of one or more operations with higher weights holding up the semaphore where lighter tasks might be able to otherwise process.

I've already used LongRunning as an option, and while it helped, it didn't totally solve the problem, as the data audit that is included in this application pools data from two data stores and compares them in memory, and in async-await there can be ten or more of these audits happening at the same time where Parallel.ForEach wouldn't support more than eight on my machine. Even with Parallel.ForEach, however, I have run out of memory because the operation over a large enough test set will always fall to every thread running a heavy operation since they take the longest to complete.

Aucun commentaire:

Enregistrer un commentaire