diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ac53db90..4838354a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -57,6 +57,7 @@ jobs: run: | dotnet test test/PatternKit.Tests/PatternKit.Tests.csproj \ --configuration Release \ + --framework net8.0 \ --no-build \ -p:TestTfmsInParallel=false \ --collect:"XPlat Code Coverage" \ @@ -65,6 +66,7 @@ jobs: -- DataCollectionRunSettings.DataCollectors.DataCollector.Configuration.Format=cobertura dotnet test test/PatternKit.Generators.Tests/PatternKit.Generators.Tests.csproj \ --configuration Release \ + --framework net8.0 \ --no-build \ -p:TestTfmsInParallel=false \ --collect:"XPlat Code Coverage" \ @@ -73,6 +75,7 @@ jobs: -- DataCollectionRunSettings.DataCollectors.DataCollector.Configuration.Format=cobertura dotnet test test/PatternKit.Hosting.Extensions.Tests/PatternKit.Hosting.Extensions.Tests.csproj \ --configuration Release \ + --framework net8.0 \ --no-build \ -p:TestTfmsInParallel=false \ --collect:"XPlat Code Coverage" \ @@ -81,6 +84,7 @@ jobs: -- DataCollectionRunSettings.DataCollectors.DataCollector.Configuration.Format=cobertura dotnet test test/PatternKit.Examples.Tests/PatternKit.Examples.Tests.csproj \ --configuration Release \ + --framework net8.0 \ --no-build \ -p:TestTfmsInParallel=false \ --collect:"XPlat Code Coverage" \ @@ -198,6 +202,7 @@ jobs: run: | dotnet test test/PatternKit.Tests/PatternKit.Tests.csproj \ --configuration Release \ + --framework net8.0 \ --no-build \ -p:TestTfmsInParallel=false \ --collect:"XPlat Code Coverage" \ @@ -206,6 +211,7 @@ jobs: -- DataCollectionRunSettings.DataCollectors.DataCollector.Configuration.Format=cobertura dotnet test test/PatternKit.Generators.Tests/PatternKit.Generators.Tests.csproj \ --configuration Release \ + --framework net8.0 \ --no-build \ -p:TestTfmsInParallel=false \ --collect:"XPlat Code Coverage" \ @@ -214,6 +220,7 @@ jobs: -- DataCollectionRunSettings.DataCollectors.DataCollector.Configuration.Format=cobertura dotnet test test/PatternKit.Hosting.Extensions.Tests/PatternKit.Hosting.Extensions.Tests.csproj \ --configuration Release \ + --framework net8.0 \ --no-build \ -p:TestTfmsInParallel=false \ --collect:"XPlat Code Coverage" \ @@ -222,6 +229,7 @@ jobs: -- DataCollectionRunSettings.DataCollectors.DataCollector.Configuration.Format=cobertura dotnet test test/PatternKit.Examples.Tests/PatternKit.Examples.Tests.csproj \ --configuration Release \ + --framework net8.0 \ --no-build \ -p:TestTfmsInParallel=false \ --collect:"XPlat Code Coverage" \ diff --git a/README.md b/README.md index e0a44b64..ca4bea5c 100644 --- a/README.md +++ b/README.md @@ -473,7 +473,7 @@ var cachedRemoteProxy = Proxy.Create(id => remoteProxy.Execute(id)) --- ## Patterns Table -PatternKit currently tracks 115 production-readiness patterns. Each catalog pattern is represented in tests, documentation, real-world examples, IoC integration, and the BenchmarkDotNet coverage matrix. +PatternKit currently tracks 116 production-readiness patterns. Each catalog pattern is represented in tests, documentation, real-world examples, IoC integration, and the BenchmarkDotNet coverage matrix. | Category | Count | Patterns | | --- | ---: | --- | @@ -482,7 +482,7 @@ PatternKit currently tracks 115 production-readiness patterns. Each catalog patt | Cloud Architecture | 20 | Ambassador, Backends for Frontends, Bulkhead, Cache-Aside, Cache Stampede Protection, Circuit Breaker, External Configuration Store, Gateway Aggregation, Gateway Routing, Health Endpoint Monitoring, Leader Election, Priority Queue, Queue-Based Load Leveling, Rate Limiting, Read-Through Cache, Retry, Scheduler Agent Supervisor, Sidecar, Strangler Fig, Write-Through Cache | | Creational | 6 | Abstract Factory, Builder, Factory Method, Object Pool, Prototype, Singleton | | Enterprise Integration | 41 | Aggregator, Canonical Data Model, Channel Adapter, Channel Purger, Claim Check, Competing Consumers, Content Enricher, Content-Based Router, Control Bus, Correlation Identifier, Dead Letter Channel, Durable Subscriber, Dynamic Router, Event Notification, Event-Carried State Transfer, Event-Driven Consumer, Guaranteed Delivery, Invalid Message Channel, Mailbox, Message Bus, Message Channel, Message Envelope, Message Expiration, Message Filter, Message History, Message Store, Message Translator, Messaging Bridge, Messaging Gateway, Pipes and Filters, Polling Consumer, Publish-Subscribe, Recipient List, Request-Reply, Resequencer, Routing Slip, Saga / Process Manager, Scatter-Gather, Service Activator, Splitter, Wire Tap | -| Messaging Reliability | 3 | Idempotent Receiver, Inbox, Outbox | +| Messaging Reliability | 4 | Backpressure, Idempotent Receiver, Inbox, Outbox | | Structural | 7 | Adapter, Bridge, Composite, Decorator, Facade, Flyweight, Proxy | ## Benchmark Snapshot @@ -527,6 +527,8 @@ BenchmarkDotNet guidance is documented in [docs/guides/benchmarks.md](docs/guide | Bridge | Execution | 91.848 ns | 664 B | 30.004 ns | 160 B | Generated bridge forwarding was faster and allocated less for notice rendering. | | Bulkhead | Construction | 20.56 ns | 216 B | 20.48 ns | 216 B | Effectively equivalent for this microbenchmark. | | Bulkhead | Execution | 102.70 ns | 592 B | 106.11 ns | 592 B | Same allocation; fluent was slightly faster for the shipping allocation workflow. | +| Backpressure | Construction | Pending | Pending | Pending | Pending | Covered by the BenchmarkDotNet matrix; publish measured values after the next benchmark refresh. | +| Backpressure | Execution | Pending | Pending | Pending | Pending | Covered by the BenchmarkDotNet matrix; publish measured values after the next benchmark refresh. | | Cache Stampede Protection | Construction | Pending | Pending | Pending | Pending | Covered by the BenchmarkDotNet matrix; publish measured values after the next benchmark refresh. | | Cache Stampede Protection | Execution | Pending | Pending | Pending | Pending | Covered by the BenchmarkDotNet matrix; publish measured values after the next benchmark refresh. | | Cache-Aside | Construction | 19.91 ns | 200 B | 19.85 ns | 200 B | Effectively equivalent for this microbenchmark. | diff --git a/benchmarks/PatternKit.Benchmarks/Messaging/BackpressureBenchmarks.cs b/benchmarks/PatternKit.Benchmarks/Messaging/BackpressureBenchmarks.cs new file mode 100644 index 00000000..dc69362a --- /dev/null +++ b/benchmarks/PatternKit.Benchmarks/Messaging/BackpressureBenchmarks.cs @@ -0,0 +1,43 @@ +using BenchmarkDotNet.Attributes; +using PatternKit.Examples.BackpressureDemo; +using PatternKit.Messaging.Reliability.Backpressure; + +namespace PatternKit.Benchmarks.Messaging; + +[BenchmarkCategory("MessagingReliability", "Backpressure")] +public class BackpressureBenchmarks +{ + private static readonly CheckoutWork Work = new("ORDER-100", 42m); + + [Benchmark(Baseline = true, Description = "Fluent: create backpressure policy")] + [BenchmarkCategory("Fluent", "Construction")] + public BackpressurePolicy Fluent_CreatePolicy() + => CheckoutBackpressurePolicies.CreateFluentPolicy(); + + [Benchmark(Description = "Generated: create backpressure policy")] + [BenchmarkCategory("Generated", "Construction")] + public BackpressurePolicy Generated_CreatePolicy() + => GeneratedCheckoutBackpressurePolicy.CreateGeneratedPolicy(); + + [Benchmark(Description = "Fluent: submit checkout work")] + [BenchmarkCategory("Fluent", "Execution")] + public ValueTask Fluent_Submit() + { + var service = new CheckoutBackpressureService( + new ScriptedCheckoutProcessor(new CheckoutAdmission("", true, "accepted")), + CheckoutBackpressurePolicies.CreateFluentPolicy()); + + return service.SubmitAsync(Work); + } + + [Benchmark(Description = "Generated: submit checkout work")] + [BenchmarkCategory("Generated", "Execution")] + public ValueTask Generated_Submit() + { + var service = new CheckoutBackpressureService( + new ScriptedCheckoutProcessor(new CheckoutAdmission("", true, "accepted")), + GeneratedCheckoutBackpressurePolicy.CreateGeneratedPolicy()); + + return service.SubmitAsync(Work); + } +} diff --git a/docs/examples/checkout-backpressure.md b/docs/examples/checkout-backpressure.md new file mode 100644 index 00000000..3b1f8576 --- /dev/null +++ b/docs/examples/checkout-backpressure.md @@ -0,0 +1,20 @@ +# Checkout Backpressure + +The checkout backpressure example shows a production-style admission gate in front of checkout work. It includes a fluent policy, a source-generated policy factory, and an `IServiceCollection` extension that registers the policy and workflow service. + +Import it into a host: + +```csharp +services.AddCheckoutBackpressureDemo(); +``` + +For reusable app-level registration without importing examples: + +```csharp +services.AddPatternKitBackpressurePolicy( + "checkout-backpressure", + builder => builder + .WithCapacity(8) + .WithMode(BackpressureMode.Wait) + .WithWaitTimeout(TimeSpan.FromMilliseconds(50))); +``` diff --git a/docs/examples/index.md b/docs/examples/index.md index 8456a3c9..b29f71d7 100644 --- a/docs/examples/index.md +++ b/docs/examples/index.md @@ -54,6 +54,9 @@ Welcome! This section collects small, focused demos that show **how to compose b * **Customer Notification Null Object** A Generic Host importable notification fallback with fluent and source-generated routes, `IServiceCollection` registration, and deterministic no-op behavior for optional collaborators. See [Customer Notification Null Object](null-object-notification.md). +* **Checkout Backpressure** + A Generic Host importable checkout admission gate with fluent and source-generated routes, `IServiceCollection` registration, and explicit saturation behavior. See [Checkout Backpressure](checkout-backpressure.md). + * **Minimal Web Request Router** A tiny "API gateway" that separates **first-match middleware** (side effects/logging/auth) from **first-match routes** and **content negotiation**. A crisp example of Strategy patterns in an HTTP-ish setting. diff --git a/docs/examples/toc.yml b/docs/examples/toc.yml index c5198b2e..33d851aa 100644 --- a/docs/examples/toc.yml +++ b/docs/examples/toc.yml @@ -286,6 +286,9 @@ - name: Shipping Bulkhead href: shipping-bulkhead.md +- name: Checkout Backpressure + href: checkout-backpressure.md + - name: Fulfillment Queue Load Leveling href: fulfillment-queue-load-leveling.md diff --git a/docs/generators/backpressure.md b/docs/generators/backpressure.md new file mode 100644 index 00000000..2ed43a2b --- /dev/null +++ b/docs/generators/backpressure.md @@ -0,0 +1,21 @@ +# Backpressure Generator + +`[GenerateBackpressurePolicy]` emits a static factory for `BackpressurePolicy` with compile-time configuration for name, capacity, saturation mode, and wait timeout. + +```csharp +[GenerateBackpressurePolicy( + typeof(CheckoutAdmission), + FactoryMethodName = "CreateGeneratedPolicy", + PolicyName = "checkout-backpressure", + Capacity = 8, + Mode = "Wait", + WaitTimeoutMilliseconds = 50)] +public static partial class GeneratedCheckoutBackpressurePolicy; +``` + +Diagnostics: + +- `PKBP001`: the host type must be partial. +- `PKBP002`: capacity and wait timeout must be valid. +- `PKBP003`: the factory method name must be a valid identifier. +- `PKBP004`: mode must be one of `Reject`, `Wait`, `DropNewest`, `DropOldest`, `Shed`, or `Observe`. diff --git a/docs/generators/index.md b/docs/generators/index.md index e68749b6..5b5d60c5 100644 --- a/docs/generators/index.md +++ b/docs/generators/index.md @@ -123,6 +123,7 @@ PatternKit includes a Roslyn incremental generator package (`PatternKit.Generato | [**Routing Slip**](messaging.md#generated-routing-slip) | Ordered message itinerary factories | `[GenerateRoutingSlip]` | | [**Saga**](messaging.md#generated-saga) | Typed process-manager transition factories | `[GenerateSaga]` | | [**Mailbox**](messaging.md#generated-mailbox) | Serialized in-process inbox factories | `[GenerateMailbox]` | +| [**Backpressure**](backpressure.md) | Admission-control policy factories for saturated work boundaries | `[GenerateBackpressurePolicy]` | | [**Reliability Pipeline**](messaging.md#generated-reliability-pipeline) | Idempotent receiver, inbox, and outbox factories | `[GenerateReliabilityPipeline]` | | [**Backplane Topology**](messaging.md#generated-backplane-topology) | Request/reply routes and publish/subscribe endpoint topology | `[GenerateBackplaneTopology]` | @@ -280,6 +281,10 @@ public static partial class OrderLineAggregator { } [GenerateMailbox(typeof(OrderWork), Capacity = 32, BackpressurePolicy = "Wait")] public static partial class OrderMailbox { } +// Backpressure - generated admission-control policy +[GenerateBackpressurePolicy(typeof(CheckoutAdmission), Capacity = 8, Mode = "Wait", WaitTimeoutMilliseconds = 50)] +public static partial class CheckoutBackpressurePolicy { } + // Reliability pipeline - generated idempotent receiver, inbox, and outbox factories [GenerateReliabilityPipeline(typeof(AcceptOrder), typeof(string), typeof(OrderAccepted))] public static partial class OrderReliability { } diff --git a/docs/generators/toc.yml b/docs/generators/toc.yml index 178aeede..7fec36dd 100644 --- a/docs/generators/toc.yml +++ b/docs/generators/toc.yml @@ -28,6 +28,9 @@ - name: Bulkhead href: bulkhead.md +- name: Backpressure + href: backpressure.md + - name: Cache-Aside href: cache-aside.md diff --git a/docs/guides/benchmark-results.md b/docs/guides/benchmark-results.md index b7fc777b..ef18eb68 100644 --- a/docs/guides/benchmark-results.md +++ b/docs/guides/benchmark-results.md @@ -49,6 +49,8 @@ The latest measured timings below were captured on Windows 11, Intel Core i9-149 | Bridge | Execution | 91.848 ns | 664 B | 30.004 ns | 160 B | Generated bridge forwarding was faster and allocated less for notice rendering. | | Bulkhead | Construction | 20.56 ns | 216 B | 20.48 ns | 216 B | Effectively equivalent for this microbenchmark. | | Bulkhead | Execution | 102.70 ns | 592 B | 106.11 ns | 592 B | Same allocation; fluent was slightly faster for the shipping allocation workflow. | +| Backpressure | Construction | Pending | Pending | Pending | Pending | Covered by the BenchmarkDotNet matrix; publish measured values after the next benchmark refresh. | +| Backpressure | Execution | Pending | Pending | Pending | Pending | Covered by the BenchmarkDotNet matrix; publish measured values after the next benchmark refresh. | | Cache Stampede Protection | Construction | Pending | Pending | Pending | Pending | Covered by the BenchmarkDotNet matrix; publish measured values after the next benchmark refresh. | | Cache Stampede Protection | Execution | Pending | Pending | Pending | Pending | Covered by the BenchmarkDotNet matrix; publish measured values after the next benchmark refresh. | | Cache-Aside | Construction | 19.91 ns | 200 B | 19.85 ns | 200 B | Effectively equivalent for this microbenchmark. | @@ -252,7 +254,7 @@ The latest measured timings below were captured on Windows 11, Intel Core i9-149 ## Coverage Matrix Summary -The coverage matrix currently publishes 115 catalog patterns and 460 pattern route results. Each pattern has four BenchmarkDotNet routes: fluent construction, fluent execution, source-generated construction, and source-generated execution. The reusable hosting integration matrix publishes 10 reusable hosting integration route results for package-level `IServiceCollection` registrations. +The coverage matrix currently publishes 116 catalog patterns and 464 pattern route results. Each pattern has four BenchmarkDotNet routes: fluent construction, fluent execution, source-generated construction, and source-generated execution. The reusable hosting integration matrix publishes 11 reusable hosting integration route results for package-level `IServiceCollection` registrations. | Category | Patterns | Published route results | | --- | ---: | ---: | @@ -261,15 +263,16 @@ The coverage matrix currently publishes 115 catalog patterns and 460 pattern rou | Cloud Architecture | 20 | 80 | | Creational | 6 | 24 | | Enterprise Integration | 41 | 164 | -| Messaging Reliability | 3 | 12 | +| Messaging Reliability | 4 | 16 | | Structural | 7 | 28 | -The generator matrix currently publishes 110 generator source route results. +The generator matrix currently publishes 111 generator source route results. ## Hosting Integration Matrix Results | Pattern | Route | Registration | Source | Tests | Docs | | --- | --- | --- | --- | --- | --- | +| Backpressure | `IServiceCollection` | `AddPatternKitBackpressurePolicy` | `src/PatternKit.Hosting.Extensions/DependencyInjection/PatternKitServiceCollectionExtensions.cs` | `test/PatternKit.Hosting.Extensions.Tests/DependencyInjection/PatternKitServiceCollectionExtensionsTests.cs` | `docs/guides/hosting-extensions.md` | | Bulkhead | `IServiceCollection` | `AddPatternKitBulkheadPolicy` | `src/PatternKit.Hosting.Extensions/DependencyInjection/PatternKitServiceCollectionExtensions.cs` | `test/PatternKit.Hosting.Extensions.Tests/DependencyInjection/PatternKitServiceCollectionExtensionsTests.cs` | `docs/guides/hosting-extensions.md` | | Circuit Breaker | `IServiceCollection` | `AddPatternKitCircuitBreakerPolicy` | `src/PatternKit.Hosting.Extensions/DependencyInjection/PatternKitServiceCollectionExtensions.cs` | `test/PatternKit.Hosting.Extensions.Tests/DependencyInjection/PatternKitServiceCollectionExtensionsTests.cs` | `docs/guides/hosting-extensions.md` | | Guaranteed Delivery | `IServiceCollection` | `AddPatternKitGuaranteedDelivery` | `src/PatternKit.Hosting.Extensions/DependencyInjection/PatternKitServiceCollectionExtensions.cs` | `test/PatternKit.Hosting.Extensions.Tests/DependencyInjection/PatternKitServiceCollectionExtensionsTests.cs` | `docs/guides/hosting-extensions.md` | @@ -390,7 +393,8 @@ The generator matrix currently publishes 110 generator source route results. | Enterprise Integration | Service Activator | Covered | Covered | Covered | Covered | | Enterprise Integration | Splitter | Covered | Covered | Covered | Covered | | Enterprise Integration | Wire Tap | Covered | Covered | Covered | Covered | -| Messaging Reliability | Idempotent Receiver | Covered | Covered | Covered | Covered | +| Messaging Reliability | Backpressure | Covered | Covered | Covered | Covered | +| Messaging Reliability | Idempotent Receiver | Covered | Covered | Covered | Covered | | Messaging Reliability | Inbox | Covered | Covered | Covered | Covered | | Messaging Reliability | Outbox | Covered | Covered | Covered | Covered | | Structural | Adapter | Covered | Covered | Covered | Covered | @@ -416,8 +420,9 @@ The generator matrix currently publishes 110 generator source route results. | AmbassadorGenerator | `src/PatternKit.Generators/Ambassador/AmbassadorGenerator.cs` | Covered | | AntiCorruptionLayerGenerator | `src/PatternKit.Generators/AntiCorruption/AntiCorruptionLayerGenerator.cs` | Covered | | AuditLogGenerator | `src/PatternKit.Generators/AuditLog/AuditLogGenerator.cs` | Covered | -| BackendsForFrontendsGenerator | `src/PatternKit.Generators/BackendsForFrontends/BackendsForFrontendsGenerator.cs` | Covered | -| BridgeGenerator | `src/PatternKit.Generators/Bridge/BridgeGenerator.cs` | Covered | +| BackendsForFrontendsGenerator | `src/PatternKit.Generators/BackendsForFrontends/BackendsForFrontendsGenerator.cs` | Covered | +| BackpressurePolicyGenerator | `src/PatternKit.Generators/Backpressure/BackpressurePolicyGenerator.cs` | Covered | +| BridgeGenerator | `src/PatternKit.Generators/Bridge/BridgeGenerator.cs` | Covered | | BuilderGenerator | `src/PatternKit.Generators/Builders/BuilderGenerator.cs` | Covered | | BulkheadPolicyGenerator | `src/PatternKit.Generators/Bulkhead/BulkheadPolicyGenerator.cs` | Covered | | CacheAsidePolicyGenerator | `src/PatternKit.Generators/CacheAside/CacheAsidePolicyGenerator.cs` | Covered | diff --git a/docs/guides/hosting-extensions.md b/docs/guides/hosting-extensions.md index 42ed41f8..d73457ff 100644 --- a/docs/guides/hosting-extensions.md +++ b/docs/guides/hosting-extensions.md @@ -65,6 +65,12 @@ services bulkhead => bulkhead .WithMaxConcurrency(8) .WithMaxQueueLength(32)) + .AddPatternKitBackpressurePolicy( + "inventory-backpressure", + backpressure => backpressure + .WithCapacity(8) + .WithMode(BackpressureMode.Wait) + .WithWaitTimeout(TimeSpan.FromMilliseconds(50))) .AddPatternKitRateLimitPolicy( "inventory-rate-limit", rateLimit => rateLimit @@ -115,6 +121,7 @@ Every catalog pattern is importable through the production example catalog. The | Retry | `AddPatternKitRetryPolicy` | Register named retry policies for synchronous service calls. | | Circuit Breaker | `AddPatternKitCircuitBreakerPolicy` | Register named circuit breakers with shared state. | | Bulkhead | `AddPatternKitBulkheadPolicy` | Register concurrency and queue isolation policies. | +| Backpressure | `AddPatternKitBackpressurePolicy` | Register admission gates for saturated work boundaries. | | Rate Limiting | `AddPatternKitRateLimitPolicy` | Register per-key rate windows. | | Queue-Based Load Leveling | `AddPatternKitQueueLoadLevelingPolicy` | Register queue-backed worker policies. | | Priority Queue | `AddPatternKitPriorityQueue` | Register priority-ordered work queues. | diff --git a/docs/index.md b/docs/index.md index c88e4313..b4587ab3 100644 --- a/docs/index.md +++ b/docs/index.md @@ -66,7 +66,7 @@ if (parser.Execute("123", out var value)) ## 📚 Available Patterns -PatternKit covers 115 production-readiness patterns with fluent APIs, source-generated routes where applicable, IoC integration examples, TinyBDD coverage, and BenchmarkDotNet coverage-matrix validation: +PatternKit covers 116 production-readiness patterns with fluent APIs, source-generated routes where applicable, IoC integration examples, TinyBDD coverage, and BenchmarkDotNet coverage-matrix validation: | Category | Count | Patterns | | --- | ---: | --- | @@ -75,7 +75,7 @@ PatternKit covers 115 production-readiness patterns with fluent APIs, source-gen | Cloud Architecture | 20 | Ambassador, Backends for Frontends, Bulkhead, Cache-Aside, Cache Stampede Protection, Circuit Breaker, External Configuration Store, Gateway Aggregation, Gateway Routing, Health Endpoint Monitoring, Leader Election, Priority Queue, Queue-Based Load Leveling, Rate Limiting, Read-Through Cache, Retry, Scheduler Agent Supervisor, Sidecar, Strangler Fig, Write-Through Cache | | Creational | 6 | Abstract Factory, Builder, Factory Method, Object Pool, Prototype, Singleton | | Enterprise Integration | 41 | Aggregator, Canonical Data Model, Channel Adapter, Channel Purger, Claim Check, Competing Consumers, Content Enricher, Content-Based Router, Control Bus, Correlation Identifier, Dead Letter Channel, Durable Subscriber, Dynamic Router, Event Notification, Event-Carried State Transfer, Event-Driven Consumer, Guaranteed Delivery, Invalid Message Channel, Mailbox, Message Bus, Message Channel, Message Envelope, Message Expiration, Message Filter, Message History, Message Store, Message Translator, Messaging Bridge, Messaging Gateway, Pipes and Filters, Polling Consumer, Publish-Subscribe, Recipient List, Request-Reply, Resequencer, Routing Slip, Saga / Process Manager, Scatter-Gather, Service Activator, Splitter, Wire Tap | -| Messaging Reliability | 3 | Idempotent Receiver, Inbox, Outbox | +| Messaging Reliability | 4 | Backpressure, Idempotent Receiver, Inbox, Outbox | | Structural | 7 | Adapter, Bridge, Composite, Decorator, Facade, Flyweight, Proxy | See [Benchmarks](guides/benchmarks.md) and [Benchmark Results](guides/benchmark-results.md) for published fluent-vs-source-generated timing, allocation snapshots, and the full pattern/generator matrix. diff --git a/docs/patterns/messaging/backpressure.md b/docs/patterns/messaging/backpressure.md new file mode 100644 index 00000000..3feac105 --- /dev/null +++ b/docs/patterns/messaging/backpressure.md @@ -0,0 +1,22 @@ +# Backpressure + +Backpressure protects a busy application boundary by making saturation explicit. `BackpressurePolicy` bounds active work and applies one configured behavior when capacity is exhausted: reject, wait, drop newest, drop oldest, shed, or observe. + +Use the fluent route when the application owns policy composition: + +```csharp +var policy = BackpressurePolicy.Create("checkout-backpressure") + .WithCapacity(8) + .WithMode(BackpressureMode.Wait) + .WithWaitTimeout(TimeSpan.FromMilliseconds(50)) + .Build(); +``` + +Use the generated route when a stable policy belongs to a host type: + +```csharp +[GenerateBackpressurePolicy(typeof(CheckoutAdmission), Mode = "Wait", Capacity = 8)] +public static partial class CheckoutBackpressure; +``` + +The policy is also available through `AddPatternKitBackpressurePolicy` for standard `IServiceCollection` integration. diff --git a/docs/patterns/toc.yml b/docs/patterns/toc.yml index e9e0633c..ac43d1e3 100644 --- a/docs/patterns/toc.yml +++ b/docs/patterns/toc.yml @@ -371,6 +371,8 @@ href: messaging/saga.md - name: Mailbox href: messaging/mailbox.md + - name: Backpressure + href: messaging/backpressure.md - name: Idempotent Receiver, Inbox, and Outbox href: messaging/reliability.md - name: Enterprise Integration Source Generators diff --git a/src/PatternKit.Core/Messaging/Reliability/Backpressure/BackpressurePolicy.cs b/src/PatternKit.Core/Messaging/Reliability/Backpressure/BackpressurePolicy.cs new file mode 100644 index 00000000..5eca1aec --- /dev/null +++ b/src/PatternKit.Core/Messaging/Reliability/Backpressure/BackpressurePolicy.cs @@ -0,0 +1,289 @@ +namespace PatternKit.Messaging.Reliability.Backpressure; + +/// +/// Admission behavior used when a backpressure policy is saturated. +/// +public enum BackpressureMode +{ + Reject, + Wait, + DropNewest, + DropOldest, + Shed, + Observe +} + +/// +/// Outcome returned by a backpressure policy execution. +/// +public sealed class BackpressureResult +{ + public BackpressureResult( + TResult? value, + bool accepted, + bool rejected, + bool dropped, + bool shed, + bool observed, + bool waited, + int activeCount, + int droppedCount) + { + Value = value; + Accepted = accepted; + Rejected = rejected; + Dropped = dropped; + Shed = shed; + Observed = observed; + Waited = waited; + ActiveCount = activeCount; + DroppedCount = droppedCount; + } + + public TResult? Value { get; } + public bool Accepted { get; } + public bool Rejected { get; } + public bool Dropped { get; } + public bool Shed { get; } + public bool Observed { get; } + public bool Waited { get; } + public int ActiveCount { get; } + public int DroppedCount { get; } + + public static BackpressureResult AcceptedResult(TResult value, bool waited, int activeCount, int droppedCount) + => new(value, true, false, false, false, false, waited, activeCount, droppedCount); + + public static BackpressureResult ObservedResult(TResult value, int activeCount, int droppedCount) + => new(value, true, false, false, false, true, false, activeCount, droppedCount); + + public static BackpressureResult RejectedResult(int activeCount, int droppedCount) + => new(default, false, true, false, false, false, false, activeCount, droppedCount); + + public static BackpressureResult DroppedResult(int activeCount, int droppedCount) + => new(default, false, false, true, false, false, false, activeCount, droppedCount); + + public static BackpressureResult ShedResult(int activeCount, int droppedCount) + => new(default, false, false, false, true, false, false, activeCount, droppedCount); +} + +/// +/// Reusable backpressure gate for bounding active work and applying explicit saturation policies. +/// +public sealed class BackpressurePolicy +{ + private readonly SemaphoreSlim _slots; + private int _activeCount; + private int _droppedCount; + + private BackpressurePolicy(string name, int capacity, BackpressureMode mode, TimeSpan waitTimeout) + { + if (string.IsNullOrWhiteSpace(name)) + throw new ArgumentException("Backpressure policy name is required.", nameof(name)); + if (capacity < 1) + throw new ArgumentOutOfRangeException(nameof(capacity), capacity, "Backpressure capacity must be at least one."); + if (!IsDefinedMode(mode)) + throw new ArgumentOutOfRangeException(nameof(mode), mode, "Backpressure mode is not valid."); + if (waitTimeout < TimeSpan.Zero) + throw new ArgumentOutOfRangeException(nameof(waitTimeout), waitTimeout, "Backpressure wait timeout cannot be negative."); + + Name = name; + Capacity = capacity; + Mode = mode; + WaitTimeout = waitTimeout; + _slots = new SemaphoreSlim(capacity, capacity); + } + + public string Name { get; } + public int Capacity { get; } + public BackpressureMode Mode { get; } + public TimeSpan WaitTimeout { get; } + public int ActiveCount => Volatile.Read(ref _activeCount); + public int DroppedCount => Volatile.Read(ref _droppedCount); + + public static Builder Create(string name = "backpressure") => new(name); + + public BackpressureResult Execute(Func operation) + { + if (operation is null) + throw new ArgumentNullException(nameof(operation)); + + var lease = Enter(); + if (!lease.Accepted) + return lease.Outcome switch + { + BackpressureOutcome.Dropped => BackpressureResult.DroppedResult(ActiveCount, DroppedCount), + BackpressureOutcome.Shed => BackpressureResult.ShedResult(ActiveCount, DroppedCount), + _ => BackpressureResult.RejectedResult(ActiveCount, DroppedCount) + }; + + try + { + var value = operation(); + return lease.Observed + ? BackpressureResult.ObservedResult(value, ActiveCount, DroppedCount) + : BackpressureResult.AcceptedResult(value, lease.Waited, ActiveCount, DroppedCount); + } + finally + { + if (!lease.Observed) + Exit(); + } + } + + public async ValueTask> ExecuteAsync( + Func> operation, + CancellationToken cancellationToken = default) + { + if (operation is null) + throw new ArgumentNullException(nameof(operation)); + + cancellationToken.ThrowIfCancellationRequested(); + var lease = await EnterAsync(cancellationToken).ConfigureAwait(false); + if (!lease.Accepted) + return lease.Outcome switch + { + BackpressureOutcome.Dropped => BackpressureResult.DroppedResult(ActiveCount, DroppedCount), + BackpressureOutcome.Shed => BackpressureResult.ShedResult(ActiveCount, DroppedCount), + _ => BackpressureResult.RejectedResult(ActiveCount, DroppedCount) + }; + + try + { + var value = await operation(cancellationToken).ConfigureAwait(false); + return lease.Observed + ? BackpressureResult.ObservedResult(value, ActiveCount, DroppedCount) + : BackpressureResult.AcceptedResult(value, lease.Waited, ActiveCount, DroppedCount); + } + finally + { + if (!lease.Observed) + Exit(); + } + } + + private BackpressureLease Enter() + { + if (_slots.Wait(0)) + return Start(waited: false); + + return Mode switch + { + BackpressureMode.Wait => WaitForSlot(), + BackpressureMode.DropNewest => Drop(BackpressureOutcome.Dropped), + BackpressureMode.DropOldest => Drop(BackpressureOutcome.Dropped), + BackpressureMode.Shed => BackpressureLease.Denied(BackpressureOutcome.Shed), + BackpressureMode.Observe => BackpressureLease.CreateObserved(), + _ => BackpressureLease.Denied(BackpressureOutcome.Rejected) + }; + } + + private async ValueTask EnterAsync(CancellationToken cancellationToken) + { + if (await _slots.WaitAsync(0, cancellationToken).ConfigureAwait(false)) + return Start(waited: false); + + return Mode switch + { + BackpressureMode.Wait => await WaitForSlotAsync(cancellationToken).ConfigureAwait(false), + BackpressureMode.DropNewest => Drop(BackpressureOutcome.Dropped), + BackpressureMode.DropOldest => Drop(BackpressureOutcome.Dropped), + BackpressureMode.Shed => BackpressureLease.Denied(BackpressureOutcome.Shed), + BackpressureMode.Observe => BackpressureLease.CreateObserved(), + _ => BackpressureLease.Denied(BackpressureOutcome.Rejected) + }; + } + + private BackpressureLease WaitForSlot() + => _slots.Wait(WaitTimeout) + ? Start(waited: true) + : BackpressureLease.Denied(BackpressureOutcome.Rejected); + + private async ValueTask WaitForSlotAsync(CancellationToken cancellationToken) + => await _slots.WaitAsync(WaitTimeout, cancellationToken).ConfigureAwait(false) + ? Start(waited: true) + : BackpressureLease.Denied(BackpressureOutcome.Rejected); + + private BackpressureLease Start(bool waited) + { + Interlocked.Increment(ref _activeCount); + return BackpressureLease.CreateAccepted(waited); + } + + private BackpressureLease Drop(BackpressureOutcome outcome) + { + Interlocked.Increment(ref _droppedCount); + return BackpressureLease.Denied(outcome); + } + + private void Exit() + { + Interlocked.Decrement(ref _activeCount); + _slots.Release(); + } + + private static bool IsDefinedMode(BackpressureMode mode) + => mode is BackpressureMode.Reject + or BackpressureMode.Wait + or BackpressureMode.DropNewest + or BackpressureMode.DropOldest + or BackpressureMode.Shed + or BackpressureMode.Observe; + + public sealed class Builder + { + private readonly string _name; + private int _capacity = 8; + private BackpressureMode _mode = BackpressureMode.Reject; + private TimeSpan _waitTimeout = TimeSpan.Zero; + + internal Builder(string name) => _name = name; + + public Builder WithCapacity(int capacity) + { + _capacity = capacity; + return this; + } + + public Builder WithMode(BackpressureMode mode) + { + _mode = mode; + return this; + } + + public Builder WithWaitTimeout(TimeSpan waitTimeout) + { + _waitTimeout = waitTimeout; + return this; + } + + public BackpressurePolicy Build() + => new(_name, _capacity, _mode, _waitTimeout); + } + + private enum BackpressureOutcome + { + Rejected, + Dropped, + Shed + } + + private readonly struct BackpressureLease + { + private BackpressureLease(bool accepted, bool waited, bool observed, BackpressureOutcome outcome) + { + Accepted = accepted; + Waited = waited; + Observed = observed; + Outcome = outcome; + } + + public bool Accepted { get; } + public bool Waited { get; } + public bool Observed { get; } + public BackpressureOutcome Outcome { get; } + + public static BackpressureLease CreateAccepted(bool waited) => new(true, waited, false, BackpressureOutcome.Rejected); + public static BackpressureLease CreateObserved() => new(true, false, true, BackpressureOutcome.Rejected); + public static BackpressureLease Denied(BackpressureOutcome outcome) => new(false, false, false, outcome); + } +} diff --git a/src/PatternKit.Examples/BackpressureDemo/CheckoutBackpressureDemo.cs b/src/PatternKit.Examples/BackpressureDemo/CheckoutBackpressureDemo.cs new file mode 100644 index 00000000..26651af9 --- /dev/null +++ b/src/PatternKit.Examples/BackpressureDemo/CheckoutBackpressureDemo.cs @@ -0,0 +1,67 @@ +using Microsoft.Extensions.DependencyInjection; +using PatternKit.Generators.Backpressure; +using PatternKit.Messaging.Reliability.Backpressure; + +namespace PatternKit.Examples.BackpressureDemo; + +public sealed record CheckoutWork(string OrderId, decimal Total); +public sealed record CheckoutAdmission(string OrderId, bool Accepted, string Reason); + +public interface ICheckoutProcessor +{ + ValueTask ProcessAsync(CheckoutWork work, CancellationToken cancellationToken = default); +} + +public sealed class ScriptedCheckoutProcessor(CheckoutAdmission admission) : ICheckoutProcessor +{ + public ValueTask ProcessAsync(CheckoutWork work, CancellationToken cancellationToken = default) + => new(admission with { OrderId = work.OrderId }); +} + +public sealed class CheckoutBackpressureService( + ICheckoutProcessor processor, + BackpressurePolicy policy) +{ + public async ValueTask SubmitAsync(CheckoutWork work, CancellationToken cancellationToken = default) + { + var result = await policy.ExecuteAsync( + async ct => await processor.ProcessAsync(work, ct).ConfigureAwait(false), + cancellationToken).ConfigureAwait(false); + + if (result.Accepted && result.Value is not null) + return result.Value; + + var reason = result.Shed ? "shed" : result.Dropped ? "dropped" : "rejected"; + return new CheckoutAdmission(work.OrderId, false, reason); + } +} + +public static class CheckoutBackpressurePolicies +{ + public static BackpressurePolicy CreateFluentPolicy() + => BackpressurePolicy.Create("checkout-backpressure") + .WithCapacity(8) + .WithMode(BackpressureMode.Wait) + .WithWaitTimeout(TimeSpan.FromMilliseconds(50)) + .Build(); +} + +[GenerateBackpressurePolicy( + typeof(CheckoutAdmission), + FactoryMethodName = nameof(CreateGeneratedPolicy), + PolicyName = "checkout-backpressure", + Capacity = 8, + Mode = "Wait", + WaitTimeoutMilliseconds = 50)] +public static partial class GeneratedCheckoutBackpressurePolicy; + +public static class CheckoutBackpressureServiceCollectionExtensions +{ + public static IServiceCollection AddCheckoutBackpressureDemo(this IServiceCollection services) + { + services.AddSingleton(_ => new ScriptedCheckoutProcessor(new CheckoutAdmission("", true, "accepted"))); + services.AddSingleton(_ => GeneratedCheckoutBackpressurePolicy.CreateGeneratedPolicy()); + services.AddSingleton(); + return services; + } +} diff --git a/src/PatternKit.Examples/DependencyInjection/PatternKitExampleServiceCollectionExtensions.cs b/src/PatternKit.Examples/DependencyInjection/PatternKitExampleServiceCollectionExtensions.cs index cd83871f..c8890eed 100644 --- a/src/PatternKit.Examples/DependencyInjection/PatternKitExampleServiceCollectionExtensions.cs +++ b/src/PatternKit.Examples/DependencyInjection/PatternKitExampleServiceCollectionExtensions.cs @@ -36,6 +36,7 @@ using PatternKit.Examples.AsyncStateDemo; using PatternKit.Examples.AuditLogDemo; using PatternKit.Examples.BackendsForFrontendsDemo; +using PatternKit.Examples.BackpressureDemo; using PatternKit.Examples.BoundedContextDemo; using PatternKit.Examples.BulkheadDemo; using PatternKit.Examples.CacheAsideDemo; @@ -111,6 +112,7 @@ using PatternKit.Messaging.Gateways; using PatternKit.Messaging.PipesAndFilters; using PatternKit.Messaging.Reliability; +using PatternKit.Messaging.Reliability.Backpressure; using PatternKit.Messaging.Routing; using PatternKit.Messaging.Storage; using PatternKit.Messaging.Transformation; @@ -237,6 +239,7 @@ public sealed record PrototypeGameCharacterFactoryExample(Prototype RemoteProxy, Proxy<(string To, string Subject, string Body), bool> EmailProxy); public sealed record FlyweightGlyphCacheExample(Func> RenderSentence); public sealed record CustomerNotificationNullObjectExample(NullObject Fallback, ICustomerNotificationChannel Channel, CustomerNotificationWorkflow Workflow); +public sealed record CheckoutBackpressureExample(BackpressurePolicy Policy, CheckoutBackpressureService Service); public sealed record TextEditorMementoExample(MementoDemo.MementoDemo.TextEditor Editor); public sealed record ObserverEventHubExample(EventHub Hub); public sealed record ReactiveViewModelExample(ProfileViewModel ViewModel); @@ -372,6 +375,7 @@ public static IServiceCollection AddPatternKitExamples(this IServiceCollection s .AddInventoryRetryExample() .AddFulfillmentCircuitBreakerExample() .AddShippingBulkheadExample() + .AddCheckoutBackpressureExample() .AddFulfillmentQueueLoadLevelingExample() .AddFulfillmentHealthEndpointExample() .AddFulfillmentPriorityQueueExample() @@ -1220,6 +1224,15 @@ public static IServiceCollection AddShippingBulkheadExample(this IServiceCollect return services.RegisterExample("Shipping Bulkhead", ExampleIntegrationSurface.LibraryOnly | ExampleIntegrationSurface.SourceGenerator | ExampleIntegrationSurface.DependencyInjection); } + public static IServiceCollection AddCheckoutBackpressureExample(this IServiceCollection services) + { + services.AddCheckoutBackpressureDemo(); + services.AddSingleton(sp => new( + sp.GetRequiredService>(), + sp.GetRequiredService())); + return services.RegisterExample("Checkout Backpressure", ExampleIntegrationSurface.LibraryOnly | ExampleIntegrationSurface.SourceGenerator | ExampleIntegrationSurface.DependencyInjection | ExampleIntegrationSurface.GenericHost); + } + public static IServiceCollection AddDashboardActivityTrackerExample(this IServiceCollection services) { services.AddDashboardActivityTrackerDemo(); diff --git a/src/PatternKit.Examples/ProductionReadiness/PatternKitExampleCatalog.cs b/src/PatternKit.Examples/ProductionReadiness/PatternKitExampleCatalog.cs index ccab01f4..e97ae6e6 100644 --- a/src/PatternKit.Examples/ProductionReadiness/PatternKitExampleCatalog.cs +++ b/src/PatternKit.Examples/ProductionReadiness/PatternKitExampleCatalog.cs @@ -848,6 +848,14 @@ public sealed class PatternKitExampleCatalog : IPatternKitExampleCatalog ExampleIntegrationSurface.LibraryOnly | ExampleIntegrationSurface.SourceGenerator | ExampleIntegrationSurface.DependencyInjection, ["Bulkhead"], ["concurrency isolation", "source-generated policy factory", "DI composition"]), + Descriptor( + "Checkout Backpressure", + "src/PatternKit.Examples/BackpressureDemo/CheckoutBackpressureDemo.cs", + "test/PatternKit.Examples.Tests/BackpressureDemo/CheckoutBackpressureDemoTests.cs", + "docs/examples/checkout-backpressure.md", + ExampleIntegrationSurface.LibraryOnly | ExampleIntegrationSurface.SourceGenerator | ExampleIntegrationSurface.DependencyInjection | ExampleIntegrationSurface.GenericHost, + ["Backpressure"], + ["saturation admission policy", "source-generated policy factory", "DI composition"]), Descriptor( "Fulfillment Queue Load Leveling", "src/PatternKit.Examples/QueueLoadLevelingDemo/FulfillmentQueueLoadLevelingDemo.cs", diff --git a/src/PatternKit.Examples/ProductionReadiness/PatternKitHostingIntegrationCatalog.cs b/src/PatternKit.Examples/ProductionReadiness/PatternKitHostingIntegrationCatalog.cs index cd591afa..0ec2d0dc 100644 --- a/src/PatternKit.Examples/ProductionReadiness/PatternKitHostingIntegrationCatalog.cs +++ b/src/PatternKit.Examples/ProductionReadiness/PatternKitHostingIntegrationCatalog.cs @@ -60,6 +60,7 @@ public sealed class PatternKitHostingIntegrationCatalog : IPatternKitHostingInte ["Rate Limiting"] = "AddPatternKitRateLimitPolicy", ["Queue-Based Load Leveling"] = "AddPatternKitQueueLoadLevelingPolicy", ["Priority Queue"] = "AddPatternKitPriorityQueue", + ["Backpressure"] = "AddPatternKitBackpressurePolicy", ["Null Object"] = "AddPatternKitNullObject" }; diff --git a/src/PatternKit.Examples/ProductionReadiness/PatternKitPatternCatalog.cs b/src/PatternKit.Examples/ProductionReadiness/PatternKitPatternCatalog.cs index d8df641d..4412d205 100644 --- a/src/PatternKit.Examples/ProductionReadiness/PatternKitPatternCatalog.cs +++ b/src/PatternKit.Examples/ProductionReadiness/PatternKitPatternCatalog.cs @@ -896,6 +896,19 @@ public sealed class PatternKitPatternCatalog : IPatternKitPatternCatalog "test/PatternKit.Examples.Tests/Messaging/MailboxExampleTests.cs", ["fluent serialized inbox", "generated mailbox", "DI-importable bounded worker example"]), + Pattern("Backpressure", PatternFamily.MessagingReliability, + "docs/patterns/messaging/backpressure.md", + "src/PatternKit.Core/Messaging/Reliability/Backpressure/BackpressurePolicy.cs", + "test/PatternKit.Tests/Messaging/Reliability/Backpressure/BackpressurePolicyTests.cs", + "docs/generators/backpressure.md", + "src/PatternKit.Generators/Backpressure/BackpressurePolicyGenerator.cs", + "test/PatternKit.Generators.Tests/BackpressurePolicyGeneratorTests.cs", + null, + "docs/examples/checkout-backpressure.md", + "src/PatternKit.Examples/BackpressureDemo/CheckoutBackpressureDemo.cs", + "test/PatternKit.Examples.Tests/BackpressureDemo/CheckoutBackpressureDemoTests.cs", + ["fluent admission control", "generated backpressure policy factory", "DI-importable checkout saturation example"]), + Pattern("Idempotent Receiver", PatternFamily.MessagingReliability, "docs/patterns/messaging/reliability.md", "src/PatternKit.Core/Messaging/Reliability/IdempotentReceiver.cs", diff --git a/src/PatternKit.Generators.Abstractions/Backpressure/BackpressureAttributes.cs b/src/PatternKit.Generators.Abstractions/Backpressure/BackpressureAttributes.cs new file mode 100644 index 00000000..32def8fe --- /dev/null +++ b/src/PatternKit.Generators.Abstractions/Backpressure/BackpressureAttributes.cs @@ -0,0 +1,12 @@ +namespace PatternKit.Generators.Backpressure; + +[AttributeUsage(AttributeTargets.Class | AttributeTargets.Struct, Inherited = false)] +public sealed class GenerateBackpressurePolicyAttribute(Type resultType) : Attribute +{ + public Type ResultType { get; } = resultType ?? throw new ArgumentNullException(nameof(resultType)); + public string FactoryMethodName { get; set; } = "Create"; + public string PolicyName { get; set; } = "backpressure"; + public int Capacity { get; set; } = 8; + public string Mode { get; set; } = "Reject"; + public int WaitTimeoutMilliseconds { get; set; } +} diff --git a/src/PatternKit.Generators/AnalyzerReleases.Unshipped.md b/src/PatternKit.Generators/AnalyzerReleases.Unshipped.md index b4493ce3..5c99db14 100644 --- a/src/PatternKit.Generators/AnalyzerReleases.Unshipped.md +++ b/src/PatternKit.Generators/AnalyzerReleases.Unshipped.md @@ -454,3 +454,7 @@ PKSCP001 | PatternKit.Generators.SnapshotCheckpoints | Error | Snapshot checkpoi PKSCP002 | PatternKit.Generators.SnapshotCheckpoints | Error | Snapshot checkpoint manager configuration is invalid. PKECM001 | PatternKit.Generators.EventualConsistency | Error | Eventual consistency monitor host must be partial. PKECM002 | PatternKit.Generators.EventualConsistency | Error | Eventual consistency monitor configuration is invalid. +PKBP001 | PatternKit.Generators.Backpressure | Error | Backpressure policy host must be partial. +PKBP002 | PatternKit.Generators.Backpressure | Error | Backpressure policy configuration is invalid. +PKBP003 | PatternKit.Generators.Backpressure | Error | Backpressure factory method name is invalid. +PKBP004 | PatternKit.Generators.Backpressure | Error | Backpressure mode is invalid. diff --git a/src/PatternKit.Generators/Backpressure/BackpressurePolicyGenerator.cs b/src/PatternKit.Generators/Backpressure/BackpressurePolicyGenerator.cs new file mode 100644 index 00000000..82027a63 --- /dev/null +++ b/src/PatternKit.Generators/Backpressure/BackpressurePolicyGenerator.cs @@ -0,0 +1,212 @@ +using System.Collections.Generic; +using System.Linq; +using System.Text; +using Microsoft.CodeAnalysis; +using Microsoft.CodeAnalysis.CSharp; +using Microsoft.CodeAnalysis.CSharp.Syntax; +using Microsoft.CodeAnalysis.Text; + +namespace PatternKit.Generators.Backpressure; + +[Generator] +public sealed class BackpressurePolicyGenerator : IIncrementalGenerator +{ + private const string GenerateBackpressurePolicyAttributeName = "PatternKit.Generators.Backpressure.GenerateBackpressurePolicyAttribute"; + + private static readonly SymbolDisplayFormat TypeFormat = new( + globalNamespaceStyle: SymbolDisplayGlobalNamespaceStyle.Included, + typeQualificationStyle: SymbolDisplayTypeQualificationStyle.NameAndContainingTypesAndNamespaces, + genericsOptions: SymbolDisplayGenericsOptions.IncludeTypeParameters, + miscellaneousOptions: SymbolDisplayMiscellaneousOptions.IncludeNullableReferenceTypeModifier | SymbolDisplayMiscellaneousOptions.UseSpecialTypes); + + private static readonly DiagnosticDescriptor MustBePartial = new( + "PKBP001", + "Backpressure policy host must be partial", + "Type '{0}' is marked with [GenerateBackpressurePolicy] but is not declared as partial", + "PatternKit.Generators.Backpressure", + DiagnosticSeverity.Error, + true); + + private static readonly DiagnosticDescriptor InvalidConfiguration = new( + "PKBP002", + "Backpressure policy configuration is invalid", + "Backpressure policy '{0}' must have Capacity >= 1 and WaitTimeoutMilliseconds >= 0", + "PatternKit.Generators.Backpressure", + DiagnosticSeverity.Error, + true); + + private static readonly DiagnosticDescriptor InvalidFactoryMethodName = new( + "PKBP003", + "Backpressure factory method name is invalid", + "Backpressure policy '{0}' has an invalid factory method name '{1}'", + "PatternKit.Generators.Backpressure", + DiagnosticSeverity.Error, + true); + + private static readonly DiagnosticDescriptor InvalidMode = new( + "PKBP004", + "Backpressure mode is invalid", + "Backpressure policy '{0}' has an invalid mode '{1}'", + "PatternKit.Generators.Backpressure", + DiagnosticSeverity.Error, + true); + + public void Initialize(IncrementalGeneratorInitializationContext context) + { + var candidates = context.SyntaxProvider.ForAttributeWithMetadataName( + GenerateBackpressurePolicyAttributeName, + static (node, _) => node is TypeDeclarationSyntax, + static (ctx, _) => (Type: (INamedTypeSymbol)ctx.TargetSymbol, Node: (TypeDeclarationSyntax)ctx.TargetNode, Attributes: ctx.Attributes)); + + context.RegisterSourceOutput(candidates, static (spc, candidate) => + { + var attr = candidate.Attributes.FirstOrDefault(static a => + a.AttributeClass?.ToDisplayString() == GenerateBackpressurePolicyAttributeName); + if (attr is not null) + Generate(spc, candidate.Type, candidate.Node, attr); + }); + } + + private static void Generate(SourceProductionContext context, INamedTypeSymbol type, TypeDeclarationSyntax node, AttributeData attribute) + { + if (!node.Modifiers.Any(static modifier => modifier.Text == "partial")) + { + context.ReportDiagnostic(Diagnostic.Create(MustBePartial, node.Identifier.GetLocation(), type.Name)); + return; + } + + var resultType = attribute.ConstructorArguments.Length >= 1 + ? attribute.ConstructorArguments[0].Value as INamedTypeSymbol + : null; + if (resultType is null) + return; + + var capacity = GetNamedInt(attribute, "Capacity") ?? 8; + var waitTimeoutMilliseconds = GetNamedInt(attribute, "WaitTimeoutMilliseconds") ?? 0; + if (capacity < 1 || waitTimeoutMilliseconds < 0) + { + context.ReportDiagnostic(Diagnostic.Create(InvalidConfiguration, node.Identifier.GetLocation(), type.Name)); + return; + } + + var factoryMethodName = GetNamedString(attribute, "FactoryMethodName") ?? "Create"; + if (!IsIdentifier(factoryMethodName)) + { + context.ReportDiagnostic(Diagnostic.Create(InvalidFactoryMethodName, node.Identifier.GetLocation(), type.Name, factoryMethodName)); + return; + } + + var policyName = GetNamedString(attribute, "PolicyName") ?? "backpressure"; + var mode = GetNamedString(attribute, "Mode") ?? "Reject"; + if (!IsKnownMode(mode)) + { + context.ReportDiagnostic(Diagnostic.Create(InvalidMode, node.Identifier.GetLocation(), type.Name, mode)); + return; + } + + context.AddSource($"{type.Name}.BackpressurePolicy.g.cs", SourceText.From( + GenerateSource(type, resultType, factoryMethodName, policyName, capacity, mode, waitTimeoutMilliseconds), + Encoding.UTF8)); + } + + private static string GenerateSource( + INamedTypeSymbol type, + INamedTypeSymbol resultType, + string factoryMethodName, + string policyName, + int capacity, + string mode, + int waitTimeoutMilliseconds) + { + var ns = type.ContainingNamespace.IsGlobalNamespace ? null : type.ContainingNamespace.ToDisplayString(); + var resultTypeName = resultType.ToDisplayString(TypeFormat); + var sb = new StringBuilder(); + sb.AppendLine("// "); + sb.AppendLine("#nullable enable"); + sb.AppendLine(); + if (ns is not null) + { + sb.Append("namespace ").Append(ns).AppendLine(";"); + sb.AppendLine(); + } + + var containingTypes = GetContainingTypes(type); + var indentLevel = 0; + foreach (var containingType in containingTypes) + { + AppendTypeDeclaration(sb, containingType, indentLevel); + sb.AppendLine(); + sb.AppendLine(new string(' ', indentLevel * 4) + "{"); + indentLevel++; + } + + AppendTypeDeclaration(sb, type, indentLevel); + sb.AppendLine(); + var indent = new string(' ', indentLevel * 4); + sb.AppendLine(indent + "{"); + var memberIndent = indent + " "; + var bodyIndent = memberIndent + " "; + sb.Append(memberIndent).Append("public static global::PatternKit.Messaging.Reliability.Backpressure.BackpressurePolicy<").Append(resultTypeName).Append("> ").Append(factoryMethodName).AppendLine("()"); + sb.AppendLine(memberIndent + "{"); + sb.Append(bodyIndent).Append("return global::PatternKit.Messaging.Reliability.Backpressure.BackpressurePolicy<").Append(resultTypeName).Append(">.Create(\"").Append(Escape(policyName)).AppendLine("\")"); + sb.Append(bodyIndent).Append(" .WithCapacity(").Append(capacity).AppendLine(")"); + sb.Append(bodyIndent).Append(" .WithMode(global::PatternKit.Messaging.Reliability.Backpressure.BackpressureMode.").Append(mode).AppendLine(")"); + sb.Append(bodyIndent).Append(" .WithWaitTimeout(global::System.TimeSpan.FromMilliseconds(").Append(waitTimeoutMilliseconds).AppendLine("))"); + sb.Append(bodyIndent).AppendLine(" .Build();"); + sb.AppendLine(memberIndent + "}"); + sb.AppendLine(indent + "}"); + for (var i = containingTypes.Length - 1; i >= 0; i--) + sb.AppendLine(new string(' ', i * 4) + "}"); + + return sb.ToString(); + } + + private static INamedTypeSymbol[] GetContainingTypes(INamedTypeSymbol type) + { + var containingTypes = new Stack(); + for (var current = type.ContainingType; current is not null; current = current.ContainingType) + containingTypes.Push(current); + + return containingTypes.ToArray(); + } + + private static void AppendTypeDeclaration(StringBuilder sb, INamedTypeSymbol type, int indentLevel) + { + sb.Append(new string(' ', indentLevel * 4)); + sb.Append(GetAccessibility(type.DeclaredAccessibility)).Append(' '); + if (type.IsStatic) + sb.Append("static "); + else if (type.IsAbstract && type.TypeKind == TypeKind.Class) + sb.Append("abstract "); + else if (type.IsSealed && type.TypeKind == TypeKind.Class) + sb.Append("sealed "); + sb.Append("partial ").Append(type.TypeKind == TypeKind.Struct ? "struct" : "class").Append(' ').Append(type.Name); + } + + private static bool IsIdentifier(string value) + => SyntaxFacts.IsValidIdentifier(value) && SyntaxFacts.GetKeywordKind(value) == SyntaxKind.None; + + private static bool IsKnownMode(string value) + => value is "Reject" or "Wait" or "DropNewest" or "DropOldest" or "Shed" or "Observe"; + + private static string Escape(string value) => value.Replace("\\", "\\\\").Replace("\"", "\\\""); + + private static string GetAccessibility(Accessibility accessibility) + => accessibility switch + { + Accessibility.Public => "public", + Accessibility.Internal => "internal", + Accessibility.Private => "private", + Accessibility.Protected => "protected", + Accessibility.ProtectedAndInternal => "private protected", + Accessibility.ProtectedOrInternal => "protected internal", + _ => "internal" + }; + + private static string? GetNamedString(AttributeData attribute, string name) + => attribute.NamedArguments.FirstOrDefault(kv => kv.Key == name).Value.Value as string; + + private static int? GetNamedInt(AttributeData attribute, string name) + => attribute.NamedArguments.FirstOrDefault(kv => kv.Key == name).Value.Value as int?; + +} diff --git a/src/PatternKit.Hosting.Extensions/DependencyInjection/PatternKitServiceCollectionExtensions.cs b/src/PatternKit.Hosting.Extensions/DependencyInjection/PatternKitServiceCollectionExtensions.cs index 3546fc36..0279ec49 100644 --- a/src/PatternKit.Hosting.Extensions/DependencyInjection/PatternKitServiceCollectionExtensions.cs +++ b/src/PatternKit.Hosting.Extensions/DependencyInjection/PatternKitServiceCollectionExtensions.cs @@ -8,6 +8,7 @@ using PatternKit.Cloud.Retry; using PatternKit.Messaging.Channels; using PatternKit.Messaging.Reliability; +using PatternKit.Messaging.Reliability.Backpressure; using PatternKit.Messaging.Storage; namespace PatternKit.Hosting.DependencyInjection; @@ -193,6 +194,25 @@ public static IServiceCollection AddPatternKitPriorityQueue( }); } + public static IServiceCollection AddPatternKitBackpressurePolicy( + this IServiceCollection services, + string name = "backpressure", + Action.Builder>? configure = null, + ServiceLifetime lifetime = ServiceLifetime.Singleton) + { + if (services is null) + throw new ArgumentNullException(nameof(services)); + + return services.AddPatternKitService( + lifetime, + _ => + { + var builder = BackpressurePolicy.Create(name); + configure?.Invoke(builder); + return builder.Build(); + }); + } + public static IServiceCollection AddPatternKitNullObject( this IServiceCollection services, TContract instance, diff --git a/test/PatternKit.Examples.Tests/BackpressureDemo/CheckoutBackpressureDemoTests.cs b/test/PatternKit.Examples.Tests/BackpressureDemo/CheckoutBackpressureDemoTests.cs new file mode 100644 index 00000000..c3489264 --- /dev/null +++ b/test/PatternKit.Examples.Tests/BackpressureDemo/CheckoutBackpressureDemoTests.cs @@ -0,0 +1,79 @@ +using Microsoft.Extensions.DependencyInjection; +using PatternKit.Examples.BackpressureDemo; +using PatternKit.Messaging.Reliability.Backpressure; +using TinyBDD; + +namespace PatternKit.Examples.Tests.BackpressureDemo; + +public sealed class CheckoutBackpressureDemoTests +{ + [Scenario("Checkout backpressure accepts work through fluent and generated policies")] + [Fact] + public async Task Checkout_Backpressure_Accepts_Work_Through_Fluent_And_Generated_Policies() + { + var work = new CheckoutWork("ORDER-100", 42m); + var processor = new ScriptedCheckoutProcessor(new CheckoutAdmission("", true, "accepted")); + var fluent = new CheckoutBackpressureService(processor, CheckoutBackpressurePolicies.CreateFluentPolicy()); + var generated = new CheckoutBackpressureService(processor, GeneratedCheckoutBackpressurePolicy.CreateGeneratedPolicy()); + + var fluentResult = await fluent.SubmitAsync(work); + var generatedResult = await generated.SubmitAsync(work); + + ScenarioExpect.True(fluentResult.Accepted); + ScenarioExpect.True(generatedResult.Accepted); + ScenarioExpect.Equal("ORDER-100", generatedResult.OrderId); + } + + [Scenario("Checkout backpressure is importable through IServiceCollection")] + [Fact] + public async Task Checkout_Backpressure_Is_Importable_Through_ServiceCollection() + { + using var provider = new ServiceCollection() + .AddCheckoutBackpressureDemo() + .BuildServiceProvider(); + + var policy = provider.GetRequiredService>(); + var service = provider.GetRequiredService(); + var result = await service.SubmitAsync(new CheckoutWork("ORDER-200", 99m)); + + ScenarioExpect.Equal("checkout-backpressure", policy.Name); + ScenarioExpect.True(result.Accepted); + ScenarioExpect.Equal("accepted", result.Reason); + } + + [Scenario("Checkout backpressure maps dropped and shed work to domain admissions")] + [Theory] + [InlineData(BackpressureMode.DropNewest, "dropped")] + [InlineData(BackpressureMode.Shed, "shed")] + [InlineData(BackpressureMode.Reject, "rejected")] + public async Task Checkout_Backpressure_Maps_Saturated_Work_To_Domain_Admissions(BackpressureMode mode, string reason) + { + var release = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var entered = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var processor = new BlockingCheckoutProcessor(entered, release); + var policy = BackpressurePolicy.Create("checkout") + .WithCapacity(1) + .WithMode(mode) + .Build(); + var service = new CheckoutBackpressureService(processor, policy); + + var first = service.SubmitAsync(new CheckoutWork("ORDER-300", 10m)); + await entered.Task; + var saturated = await service.SubmitAsync(new CheckoutWork("ORDER-301", 10m)); + release.SetResult(); + _ = await first; + + ScenarioExpect.False(saturated.Accepted); + ScenarioExpect.Equal(reason, saturated.Reason); + } + + private sealed class BlockingCheckoutProcessor(TaskCompletionSource entered, TaskCompletionSource release) : ICheckoutProcessor + { + public async ValueTask ProcessAsync(CheckoutWork work, CancellationToken cancellationToken = default) + { + entered.SetResult(); + await release.Task.WaitAsync(cancellationToken); + return new CheckoutAdmission(work.OrderId, true, "accepted"); + } + } +} diff --git a/test/PatternKit.Examples.Tests/ProductionReadiness/PatternKitBenchmarkCoverageTests.cs b/test/PatternKit.Examples.Tests/ProductionReadiness/PatternKitBenchmarkCoverageTests.cs index f18e6a0f..50fe2c81 100644 --- a/test/PatternKit.Examples.Tests/ProductionReadiness/PatternKitBenchmarkCoverageTests.cs +++ b/test/PatternKit.Examples.Tests/ProductionReadiness/PatternKitBenchmarkCoverageTests.cs @@ -107,7 +107,7 @@ public Task Published_Benchmark_Results_Include_Every_Catalog_Pattern() .Then("every catalog pattern appears in the benchmark results matrix", ctx => ScenarioExpect.Empty(ctx.MissingPatterns)) .And("the guide publishes the route result total", ctx => - ScenarioExpect.Contains("460 pattern route results", ctx.ResultsGuide)) + ScenarioExpect.Contains("464 pattern route results", ctx.ResultsGuide)) .AssertPassed(); [Scenario("Published benchmark results include reusable hosting integrations")] diff --git a/test/PatternKit.Examples.Tests/ProductionReadiness/PatternKitPatternCatalogTests.cs b/test/PatternKit.Examples.Tests/ProductionReadiness/PatternKitPatternCatalogTests.cs index 3826ebdd..f8e7480f 100644 --- a/test/PatternKit.Examples.Tests/ProductionReadiness/PatternKitPatternCatalogTests.cs +++ b/test/PatternKit.Examples.Tests/ProductionReadiness/PatternKitPatternCatalogTests.cs @@ -77,6 +77,7 @@ public sealed class PatternKitPatternCatalogTests(ITestOutputHelper output) : Ti "Routing Slip", "Saga / Process Manager", "Mailbox", + "Backpressure", "Idempotent Receiver", "Inbox", "Outbox", @@ -171,7 +172,7 @@ public Task Catalog_Includes_Enterprise_Integration_And_Architecture_Patterns() .And("enterprise entries are grouped by integration reliability and architecture families", patterns => { ScenarioExpect.Equal(41, patterns.Count(static p => p.Family == PatternFamily.EnterpriseIntegration)); - ScenarioExpect.Equal(3, patterns.Count(static p => p.Family == PatternFamily.MessagingReliability)); + ScenarioExpect.Equal(4, patterns.Count(static p => p.Family == PatternFamily.MessagingReliability)); ScenarioExpect.Equal(20, patterns.Count(static p => p.Family == PatternFamily.CloudArchitecture)); ScenarioExpect.Equal(26, patterns.Count(static p => p.Family == PatternFamily.ApplicationArchitecture)); }) @@ -249,6 +250,7 @@ public Task Hosting_Integration_Catalog_Audits_Every_Pattern() ScenarioExpect.Equal( new[] { + "Backpressure", "Bulkhead", "Circuit Breaker", "Guaranteed Delivery", diff --git a/test/PatternKit.Generators.Tests/BackpressurePolicyGeneratorTests.cs b/test/PatternKit.Generators.Tests/BackpressurePolicyGeneratorTests.cs new file mode 100644 index 00000000..2c4ffd11 --- /dev/null +++ b/test/PatternKit.Generators.Tests/BackpressurePolicyGeneratorTests.cs @@ -0,0 +1,187 @@ +using Microsoft.CodeAnalysis; +using Microsoft.CodeAnalysis.CSharp; +using PatternKit.Generators.Backpressure; +using TinyBDD; +using TinyBDD.Xunit; +using Xunit.Abstractions; + +namespace PatternKit.Generators.Tests; + +[Feature("Backpressure Policy generator")] +public sealed partial class BackpressurePolicyGeneratorTests(ITestOutputHelper output) : TinyBddXunitBase(output) +{ + [Scenario("Generates backpressure policy factory")] + [Fact] + public Task Generates_Backpressure_Policy_Factory() + => Given("a configured backpressure policy declaration", () => Compile(""" + using PatternKit.Generators.Backpressure; + namespace Demo; + [GenerateBackpressurePolicy(typeof(string), FactoryMethodName = "Build", PolicyName = "checkout", Capacity = 4, Mode = "Wait", WaitTimeoutMilliseconds = 250)] + public static partial class CheckoutBackpressure; + """)) + .Then("generated source creates the configured policy", result => + { + ScenarioExpect.Empty(result.Diagnostics); + var source = ScenarioExpect.Single(result.GeneratedSources); + ScenarioExpect.Equal("CheckoutBackpressure.BackpressurePolicy.g.cs", source.HintName); + ScenarioExpect.Contains("public static partial class CheckoutBackpressure", source.Source); + ScenarioExpect.Contains("Build()", source.Source); + ScenarioExpect.Contains("BackpressurePolicy.Create(\"checkout\")", source.Source); + ScenarioExpect.Contains(".WithCapacity(4)", source.Source); + ScenarioExpect.Contains(".WithMode(global::PatternKit.Messaging.Reliability.Backpressure.BackpressureMode.Wait)", source.Source); + ScenarioExpect.Contains(".WithWaitTimeout(global::System.TimeSpan.FromMilliseconds(250))", source.Source); + ScenarioExpect.True(result.EmitSuccess, string.Join(Environment.NewLine, result.EmitDiagnostics)); + }) + .AssertPassed(); + + [Scenario("Reports diagnostics for invalid backpressure declarations")] + [Theory] + [InlineData("public static class BackpressureHost;", "PKBP001")] + [InlineData("public static partial class BackpressureHost;", "PKBP002", "Capacity = 0")] + [InlineData("public static partial class BackpressureHost;", "PKBP002", "WaitTimeoutMilliseconds = -1")] + [InlineData("public static partial class BackpressureHost;", "PKBP003", "FactoryMethodName = \"1bad\"")] + [InlineData("public static partial class BackpressureHost;", "PKBP003", "FactoryMethodName = \"class\"")] + [InlineData("public static partial class BackpressureHost;", "PKBP004", "Mode = \"Unknown\"")] + public Task Reports_Diagnostics_For_Invalid_Backpressure_Declarations(string declaration, string diagnosticId, string configuration = "") + => Given("an invalid backpressure policy declaration", () => Compile($$""" + using PatternKit.Generators.Backpressure; + [GenerateBackpressurePolicy(typeof(string){{(string.IsNullOrWhiteSpace(configuration) ? "" : ", " + configuration)}})] + {{declaration}} + """)) + .Then("the expected diagnostic is reported", result => + ScenarioExpect.Contains(result.Diagnostics, diagnostic => diagnostic.Id == diagnosticId)) + .AssertPassed(); + + [Scenario("Generates nested backpressure host wrappers")] + [Fact] + public Task Generates_Nested_Backpressure_Host_Wrappers() + => Given("nested backpressure declarations", () => Compile(""" + using PatternKit.Generators.Backpressure; + namespace Demo; + + public partial class BackpressureContainer + { + private partial class PrivateHost + { + [GenerateBackpressurePolicy(typeof(int), Mode = "DropNewest")] + protected partial class ProtectedBackpressure; + } + } + """)) + .Then("generated sources preserve containing partial type wrappers", result => + { + ScenarioExpect.Empty(result.Diagnostics); + var source = ScenarioExpect.Single(result.GeneratedSources); + ScenarioExpect.Contains("public partial class BackpressureContainer", source.Source); + ScenarioExpect.Contains("private partial class PrivateHost", source.Source); + ScenarioExpect.Contains("protected partial class ProtectedBackpressure", source.Source); + ScenarioExpect.Contains("BackpressureMode.DropNewest", source.Source); + ScenarioExpect.True(result.EmitSuccess, string.Join(Environment.NewLine, result.EmitDiagnostics)); + }) + .AssertPassed(); + + [Scenario("Generates backpressure defaults and host shapes")] + [Fact] + public Task Generates_Backpressure_Defaults_And_Host_Shapes() + => Given("backpressure policy declarations with default names and host shapes", () => Compile(""" + using PatternKit.Generators.Backpressure; + namespace Demo; + + [GenerateBackpressurePolicy(typeof(string))] + internal abstract partial class AbstractBackpressure; + + [GenerateBackpressurePolicy(typeof(string), PolicyName = "tenant\\\"backpressure")] + public sealed partial class SealedBackpressure; + + [GenerateBackpressurePolicy(typeof(int))] + internal partial struct StructBackpressure; + """)) + .Then("generated sources preserve host shape and configured defaults", result => + { + ScenarioExpect.Empty(result.Diagnostics); + ScenarioExpect.Equal(3, result.GeneratedSources.Count); + + var combined = string.Join("\n", result.GeneratedSources.Select(static source => source.Source)); + ScenarioExpect.Contains("internal abstract partial class AbstractBackpressure", combined); + ScenarioExpect.Contains("public sealed partial class SealedBackpressure", combined); + ScenarioExpect.Contains("internal partial struct StructBackpressure", combined); + ScenarioExpect.Contains("Create(\"backpressure\")", combined); + ScenarioExpect.Contains("Create(\"tenant\\\\\\\"backpressure\")", combined); + ScenarioExpect.Contains(".WithCapacity(8)", combined); + ScenarioExpect.Contains("BackpressureMode.Reject", combined); + ScenarioExpect.Contains("FromMilliseconds(0)", combined); + ScenarioExpect.True(result.EmitSuccess, string.Join(Environment.NewLine, result.EmitDiagnostics)); + }) + .AssertPassed(); + + [Scenario("Backpressure attribute exposes configured values")] + [Fact] + public Task Backpressure_Attribute_Exposes_Configured_Values() + => Given("a backpressure generator attribute", () => new GenerateBackpressurePolicyAttribute(typeof(string)) + { + FactoryMethodName = "Build", + PolicyName = "checkout", + Capacity = 4, + Mode = "Wait", + WaitTimeoutMilliseconds = 250 + }) + .Then("configuration values are preserved", attribute => + { + ScenarioExpect.Equal(typeof(string), attribute.ResultType); + ScenarioExpect.Equal("Build", attribute.FactoryMethodName); + ScenarioExpect.Equal("checkout", attribute.PolicyName); + ScenarioExpect.Equal(4, attribute.Capacity); + ScenarioExpect.Equal("Wait", attribute.Mode); + ScenarioExpect.Equal(250, attribute.WaitTimeoutMilliseconds); + ScenarioExpect.Throws(() => new GenerateBackpressurePolicyAttribute(null!)); + }) + .AssertPassed(); + + [Scenario("Skips malformed backpressure result type")] + [Fact] + public Task Skips_Malformed_Backpressure_Result_Type() + => Given("a backpressure declaration with a null result type", () => Compile(""" + using PatternKit.Generators.Backpressure; + [GenerateBackpressurePolicy(null!)] + public static partial class BackpressureHost; + """)) + .Then("no source is generated", result => + ScenarioExpect.Empty(result.GeneratedSources)) + .AssertPassed(); + + private static GeneratorResult Compile(string source) + { + var compilation = CreateCompilation(source, "BackpressurePolicyGeneratorTests"); + _ = RoslynTestHelpers.Run(compilation, new BackpressurePolicyGenerator(), out var run, out var updated); + var result = run.Results.Single(); + var emit = updated.Emit(Stream.Null); + return new GeneratorResult( + result.Diagnostics.ToArray(), + result.GeneratedSources.Select(static source => new GeneratedSource(source.HintName, source.SourceText.ToString())).ToArray(), + emit.Success, + emit.Diagnostics.Select(static diagnostic => diagnostic.ToString()).ToArray()); + } + + private static CSharpCompilation CreateCompilation(string source, string assemblyName) + => RoslynTestHelpers.CreateCompilation( + source, + assemblyName, + extra: + [ + MetadataReference.CreateFromFile(GetAbstractionsAssemblyPath()), + MetadataReference.CreateFromFile(typeof(PatternKit.Messaging.Reliability.Backpressure.BackpressurePolicy<>).Assembly.Location) + ]); + + private static string GetAbstractionsAssemblyPath() + => Path.Combine( + Path.GetDirectoryName(typeof(BackpressurePolicyGenerator).Assembly.Location)!, + "PatternKit.Generators.Abstractions.dll"); + + private sealed record GeneratorResult( + IReadOnlyList Diagnostics, + IReadOnlyList GeneratedSources, + bool EmitSuccess, + IReadOnlyList EmitDiagnostics); + + private sealed record GeneratedSource(string HintName, string Source); +} diff --git a/test/PatternKit.Hosting.Extensions.Tests/DependencyInjection/PatternKitServiceCollectionExtensionsTests.cs b/test/PatternKit.Hosting.Extensions.Tests/DependencyInjection/PatternKitServiceCollectionExtensionsTests.cs index 6e76d97e..fbfe3827 100644 --- a/test/PatternKit.Hosting.Extensions.Tests/DependencyInjection/PatternKitServiceCollectionExtensionsTests.cs +++ b/test/PatternKit.Hosting.Extensions.Tests/DependencyInjection/PatternKitServiceCollectionExtensionsTests.cs @@ -10,6 +10,7 @@ using PatternKit.Messaging; using PatternKit.Messaging.Channels; using PatternKit.Messaging.Reliability; +using PatternKit.Messaging.Reliability.Backpressure; using PatternKit.Messaging.Storage; using TinyBDD; using TinyBDD.Xunit; @@ -75,6 +76,9 @@ public Task Cloud_Resilience_Primitives_Register_Through_IServiceCollection() .AddPatternKitBulkheadPolicy( "inventory-bulkhead", builder => builder.WithMaxConcurrency(2)) + .AddPatternKitBackpressurePolicy( + "inventory-backpressure", + builder => builder.WithCapacity(2).WithMode(BackpressureMode.Wait)) .AddPatternKitRateLimitPolicy( "inventory-rate-limit", builder => builder.WithPermitLimit(1).WithWindow(TimeSpan.FromMinutes(1))) @@ -94,6 +98,7 @@ public Task Cloud_Resilience_Primitives_Register_Through_IServiceCollection() var retry = provider.GetRequiredService>(); var breaker = provider.GetRequiredService>(); var bulkhead = provider.GetRequiredService>(); + var backpressure = provider.GetRequiredService>(); var rateLimit = provider.GetRequiredService>(); var leveling = provider.GetRequiredService>(); var priority = provider.GetRequiredService>(); @@ -101,6 +106,7 @@ public Task Cloud_Resilience_Primitives_Register_Through_IServiceCollection() var retryResult = retry.Execute(static () => new ServiceReply(true)); var breakerResult = breaker.Execute(static () => new ServiceReply(true)); var bulkheadResult = bulkhead.Execute(static () => new ServiceReply(true)); + var backpressureResult = backpressure.Execute(static () => new ServiceReply(true)); var rateLimitResult = rateLimit.Execute("tenant-a", static () => new ServiceReply(true)); var levelingResult = leveling.Execute(static () => new ServiceReply(true)); priority.Enqueue(new("slow", 1)); @@ -111,12 +117,14 @@ public Task Cloud_Resilience_Primitives_Register_Through_IServiceCollection() retry, breaker, bulkhead, + backpressure, rateLimit, leveling, priority, retryResult, breakerResult, bulkheadResult, + backpressureResult, rateLimitResult, levelingResult, next); @@ -130,6 +138,8 @@ public Task Cloud_Resilience_Primitives_Register_Through_IServiceCollection() ScenarioExpect.True(result.BreakerResult.Succeeded); ScenarioExpect.Equal("inventory-bulkhead", result.Bulkhead.Name); ScenarioExpect.True(result.BulkheadResult.Succeeded); + ScenarioExpect.Equal("inventory-backpressure", result.Backpressure.Name); + ScenarioExpect.True(result.BackpressureResult.Accepted); ScenarioExpect.Equal("inventory-rate-limit", result.RateLimit.Name); ScenarioExpect.True(result.RateLimitResult.Allowed); ScenarioExpect.Equal("inventory-leveling", result.Leveling.Name); @@ -193,6 +203,8 @@ public Task Hosting_Extensions_Validate_Registration_Input() () => inputs.MissingServices!.AddPatternKitMessageChannel()), ScenarioExpect.Throws( () => inputs.Services.AddPatternKitPriorityQueue(null!)), + ScenarioExpect.Throws( + () => inputs.MissingServices!.AddPatternKitBackpressurePolicy()), ScenarioExpect.Throws( () => inputs.MissingServices!.AddPatternKitNullObject(new SilentNotificationSink())), ScenarioExpect.Throws( @@ -209,6 +221,7 @@ public Task Hosting_Extensions_Validate_Registration_Input() { ScenarioExpect.Equal("services", results.MissingServicesException.ParamName); ScenarioExpect.Equal("prioritySelector", results.PrioritySelectorException.ParamName); + ScenarioExpect.Equal("services", results.BackpressureMissingServicesException.ParamName); ScenarioExpect.Equal("services", results.NullObjectInstanceMissingServicesException.ParamName); ScenarioExpect.Equal("services", results.NullObjectFactoryMissingServicesException.ParamName); ScenarioExpect.Equal("instance", results.NullObjectInstanceException.ParamName); @@ -246,6 +259,7 @@ private sealed record InvalidRegistrationInputs(IServiceCollection? MissingServi private sealed record InvalidRegistrationResults( ArgumentNullException MissingServicesException, ArgumentNullException PrioritySelectorException, + ArgumentNullException BackpressureMissingServicesException, ArgumentNullException NullObjectInstanceMissingServicesException, ArgumentNullException NullObjectFactoryMissingServicesException, ArgumentNullException NullObjectInstanceException, @@ -265,12 +279,14 @@ private sealed record CloudRegistrationResult( RetryPolicy Retry, CircuitBreakerPolicy Breaker, BulkheadPolicy Bulkhead, + BackpressurePolicy Backpressure, RateLimitPolicy RateLimit, QueueLoadLevelingPolicy Leveling, PriorityQueuePolicy Priority, RetryResult RetryResult, CircuitBreakerResult BreakerResult, BulkheadResult BulkheadResult, + BackpressureResult BackpressureResult, RateLimitResult RateLimitResult, QueueLoadLevelingResult LevelingResult, PriorityQueueDequeueResult Next); diff --git a/test/PatternKit.Tests/Messaging/Reliability/Backpressure/BackpressurePolicyTests.cs b/test/PatternKit.Tests/Messaging/Reliability/Backpressure/BackpressurePolicyTests.cs new file mode 100644 index 00000000..d19de750 --- /dev/null +++ b/test/PatternKit.Tests/Messaging/Reliability/Backpressure/BackpressurePolicyTests.cs @@ -0,0 +1,191 @@ +using PatternKit.Messaging.Reliability.Backpressure; +using TinyBDD; + +namespace PatternKit.Tests.Messaging.Reliability.Backpressure; + +public sealed class BackpressurePolicyTests +{ + [Scenario("Backpressure rejects work when saturated")] + [Fact] + public async Task Backpressure_Rejects_Work_When_Saturated() + { + var release = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var entered = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var policy = BackpressurePolicy.Create("orders") + .WithCapacity(1) + .WithMode(BackpressureMode.Reject) + .Build(); + + var first = policy.ExecuteAsync(async _ => + { + entered.SetResult(); + await release.Task; + return "accepted"; + }); + await entered.Task; + + var rejected = await policy.ExecuteAsync(static _ => new ValueTask("overflow")); + release.SetResult(); + var accepted = await first; + + ScenarioExpect.True(accepted.Accepted); + ScenarioExpect.True(rejected.Rejected); + ScenarioExpect.False(rejected.Accepted); + ScenarioExpect.Equal("orders", policy.Name); + ScenarioExpect.Equal(1, policy.Capacity); + ScenarioExpect.Equal(BackpressureMode.Reject, policy.Mode); + } + + [Scenario("Backpressure waits for capacity")] + [Fact] + public async Task Backpressure_Waits_For_Capacity() + { + var release = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var entered = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var policy = BackpressurePolicy.Create("wait") + .WithCapacity(1) + .WithMode(BackpressureMode.Wait) + .WithWaitTimeout(TimeSpan.FromSeconds(2)) + .Build(); + + var first = policy.ExecuteAsync(async _ => + { + entered.SetResult(); + await release.Task; + return 1; + }); + await entered.Task; + + var second = policy.ExecuteAsync(static _ => new ValueTask(2)); + release.SetResult(); + + _ = await first; + var waited = await second; + + ScenarioExpect.True(waited.Accepted); + ScenarioExpect.True(waited.Waited); + ScenarioExpect.Equal(2, waited.Value); + ScenarioExpect.Equal(0, policy.ActiveCount); + } + + [Scenario("Synchronous backpressure reports explicit saturation policies")] + [Theory] + [InlineData(BackpressureMode.Reject, false, false, false, true)] + [InlineData(BackpressureMode.DropNewest, true, false, false, false)] + [InlineData(BackpressureMode.Shed, false, true, false, false)] + [InlineData(BackpressureMode.Observe, false, false, true, false)] + [InlineData(BackpressureMode.DropOldest, true, false, false, false)] + public async Task Synchronous_Backpressure_Reports_Explicit_Saturation_Policies( + BackpressureMode mode, + bool dropped, + bool shed, + bool observed, + bool rejected) + { + var release = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var entered = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var policy = BackpressurePolicy.Create("sync-policy") + .WithCapacity(1) + .WithMode(mode) + .Build(); + + var first = Task.Run(() => policy.Execute(() => + { + entered.SetResult(); + release.Task.GetAwaiter().GetResult(); + return "active"; + })); + await entered.Task; + + var saturated = policy.Execute(static () => "fallback"); + release.SetResult(); + var completed = await first; + + ScenarioExpect.True(completed.Accepted); + ScenarioExpect.Equal(dropped, saturated.Dropped); + ScenarioExpect.Equal(shed, saturated.Shed); + ScenarioExpect.Equal(observed, saturated.Observed); + ScenarioExpect.Equal(rejected, saturated.Rejected); + ScenarioExpect.Equal(mode is BackpressureMode.DropNewest or BackpressureMode.DropOldest ? 1 : 0, policy.DroppedCount); + } + + [Scenario("Synchronous backpressure wait can time out")] + [Fact] + public async Task Synchronous_Backpressure_Wait_Can_Time_Out() + { + var release = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var entered = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var policy = BackpressurePolicy.Create("sync-timeout") + .WithCapacity(1) + .WithMode(BackpressureMode.Wait) + .WithWaitTimeout(TimeSpan.FromMilliseconds(10)) + .Build(); + + var first = Task.Run(() => policy.Execute(() => + { + entered.SetResult(); + release.Task.GetAwaiter().GetResult(); + return "active"; + })); + await entered.Task; + + var timedOut = policy.Execute(static () => "late"); + release.SetResult(); + _ = await first; + + ScenarioExpect.True(timedOut.Rejected); + ScenarioExpect.False(timedOut.Accepted); + ScenarioExpect.Equal(0, policy.ActiveCount); + } + + [Scenario("Backpressure reports explicit saturation policies")] + [Theory] + [InlineData(BackpressureMode.DropNewest, true, false, false)] + [InlineData(BackpressureMode.Shed, false, true, false)] + [InlineData(BackpressureMode.Observe, false, false, true)] + [InlineData(BackpressureMode.DropOldest, true, false, false)] + public async Task Backpressure_Reports_Explicit_Saturation_Policies( + BackpressureMode mode, + bool dropped, + bool shed, + bool observed) + { + var release = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var entered = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var policy = BackpressurePolicy.Create("policy") + .WithCapacity(1) + .WithMode(mode) + .Build(); + + var first = policy.ExecuteAsync(async _ => + { + entered.SetResult(); + await release.Task; + return "active"; + }); + await entered.Task; + + var saturated = await policy.ExecuteAsync(static _ => new ValueTask("fallback")); + release.SetResult(); + _ = await first; + + ScenarioExpect.Equal(dropped, saturated.Dropped); + ScenarioExpect.Equal(shed, saturated.Shed); + ScenarioExpect.Equal(observed, saturated.Observed); + ScenarioExpect.Equal(mode is BackpressureMode.DropNewest or BackpressureMode.DropOldest ? 1 : 0, policy.DroppedCount); + } + + [Scenario("Backpressure validates configuration and callbacks")] + [Fact] + public async Task Backpressure_Validates_Configuration_And_Callbacks() + { + var policy = BackpressurePolicy.Create().Build(); + + ScenarioExpect.Throws(() => BackpressurePolicy.Create("").Build()); + ScenarioExpect.Throws(() => BackpressurePolicy.Create().WithCapacity(0).Build()); + ScenarioExpect.Throws(() => BackpressurePolicy.Create().WithMode((BackpressureMode)99).Build()); + ScenarioExpect.Throws(() => BackpressurePolicy.Create().WithWaitTimeout(TimeSpan.FromMilliseconds(-1)).Build()); + ScenarioExpect.Throws(() => policy.Execute(null!)); + await ScenarioExpect.ThrowsAsync(() => policy.ExecuteAsync(null!).AsTask()); + } +}