-
Notifications
You must be signed in to change notification settings - Fork 1.7k
feat(api): make API calls non-blocking #6733
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -88,17 +88,15 @@ private void throwTronError(String strategy, String params, String servlet, Exc | |
| @Override | ||
| protected void service(HttpServletRequest req, HttpServletResponse resp) | ||
| throws ServletException, IOException { | ||
|
|
||
| RuntimeData runtimeData = new RuntimeData(req); | ||
| GlobalRateLimiter.acquire(runtimeData); | ||
|
|
||
| RuntimeData runtimeData = new RuntimeData(req); | ||
| IRateLimiter rateLimiter = container.get(KEY_PREFIX_HTTP, getClass().getSimpleName()); | ||
|
|
||
| boolean acquireResource = true; | ||
| // Check per-endpoint first to avoid consuming global IP/QPS quota for requests | ||
| // that would be rejected by the per-endpoint limiter anyway. | ||
| boolean perEndpointAcquired = rateLimiter == null || rateLimiter.tryAcquire(runtimeData); | ||
| boolean acquireResource = perEndpointAcquired && GlobalRateLimiter.tryAcquire(runtimeData); | ||
|
|
||
| if (rateLimiter != null) { | ||
| acquireResource = rateLimiter.acquire(runtimeData); | ||
| } | ||
| String contextPath = req.getContextPath(); | ||
| String url = Strings.isNullOrEmpty(req.getServletPath()) | ||
| ? MetricLabels.UNDEFINED : contextPath + req.getServletPath(); | ||
|
|
@@ -119,7 +117,9 @@ protected void service(HttpServletRequest req, HttpServletResponse resp) | |
| } catch (Exception unexpected) { | ||
| logger.error("Http Api {}, Method:{}. Error:", url, req.getMethod(), unexpected); | ||
| } finally { | ||
| if (rateLimiter instanceof IPreemptibleRateLimiter && acquireResource) { | ||
| // Release whenever the per-endpoint permit was acquired (covers both the normal | ||
| // completion path and the case where GlobalRateLimiter rejected the request). | ||
| if (rateLimiter instanceof IPreemptibleRateLimiter && perEndpointAcquired) { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (commenting on line 122 because the issue lines 113 are outside the diff hunk) [NIT] IllegalAccessException is the wrong exception type for rate limiting
Suggestion: use a clearer message or a dedicated |
||
| ((IPreemptibleRateLimiter) rateLimiter).release(); | ||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,8 +5,10 @@ | |
| import com.google.common.cache.CacheBuilder; | ||
| import com.google.common.util.concurrent.RateLimiter; | ||
| import java.util.concurrent.TimeUnit; | ||
| import lombok.extern.slf4j.Slf4j; | ||
| import org.tron.core.config.args.Args; | ||
|
|
||
| @Slf4j | ||
| public class GlobalRateLimiter { | ||
|
|
||
| private static double QPS = Args.getInstance().getRateLimiterGlobalQps(); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (commenting on line 14 because the issue lines 16 are outside the diff hunk) [NIT] IP_QPS<=0 misconfiguration causes a silent fail-closed warn storm If Suggestion: validate |
||
|
|
@@ -18,18 +20,24 @@ public class GlobalRateLimiter { | |
|
|
||
| private static RateLimiter rateLimiter = RateLimiter.create(QPS); | ||
|
|
||
| public static void acquire(RuntimeData runtimeData) { | ||
| rateLimiter.acquire(); | ||
| public static boolean tryAcquire(RuntimeData runtimeData) { | ||
| String ip = runtimeData.getRemoteAddr(); | ||
| if (Strings.isNullOrEmpty(ip)) { | ||
| return; | ||
| if (!Strings.isNullOrEmpty(ip)) { | ||
| RateLimiter r; | ||
| try { | ||
| // cache.get is atomic: only one loader executes per key under concurrent requests, | ||
| // preventing multiple RateLimiter instances from being created for the same IP. | ||
| r = cache.get(ip, () -> RateLimiter.create(IP_QPS)); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [NIT] cache.get atomicity claim has no concurrent regression test The new Suggestion: add a small concurrent test (CountDownLatch + 2+ threads + AtomicInteger counting loader invocations) that asserts the loader runs at most once per key under concurrent access. |
||
| } catch (Exception e) { | ||
| logger.warn("Failed to load IP rate limiter for {}, denying request: {}", | ||
| ip, e.getMessage()); | ||
| return false; | ||
| } | ||
| if (!r.tryAcquire()) { | ||
| return false; | ||
| } | ||
| } | ||
| RateLimiter r = cache.getIfPresent(ip); | ||
| if (r == null) { | ||
| r = RateLimiter.create(IP_QPS); | ||
| cache.put(ip, r); | ||
| } | ||
| r.acquire(); | ||
| return rateLimiter.tryAcquire(); | ||
| } | ||
|
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -104,44 +104,49 @@ public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, | |
| IRateLimiter rateLimiter = container | ||
| .get(KEY_PREFIX_RPC, call.getMethodDescriptor().getFullMethodName()); | ||
|
|
||
| RuntimeData runtimeData = new RuntimeData(call); | ||
| GlobalRateLimiter.acquire(runtimeData); | ||
|
|
||
| boolean acquireResource = true; | ||
| Listener<ReqT> listener = new ServerCall.Listener<ReqT>() {}; | ||
|
|
||
| if (rateLimiter != null) { | ||
| acquireResource = rateLimiter.acquire(runtimeData); | ||
| RuntimeData runtimeData = new RuntimeData(call); | ||
| // Check per-endpoint first to avoid consuming global IP/QPS quota for requests | ||
| // that would be rejected by the per-endpoint limiter anyway. | ||
| boolean perEndpointAcquired = rateLimiter == null || rateLimiter.tryAcquire(runtimeData); | ||
| boolean acquireResource = perEndpointAcquired && GlobalRateLimiter.tryAcquire(runtimeData); | ||
|
|
||
| if (!acquireResource) { | ||
| // Release the per-endpoint permit when global rejected, to avoid semaphore leak. | ||
| if (rateLimiter instanceof IPreemptibleRateLimiter && perEndpointAcquired) { | ||
| ((IPreemptibleRateLimiter) rateLimiter).release(); | ||
| } | ||
| call.close(Status.fromCode(Code.RESOURCE_EXHAUSTED), new Metadata()); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [NIT] No metrics on rate-limit reject paths The early-return path here (and the symmetric HTTP one in Suggestion: add a |
||
| return listener; | ||
| } | ||
|
|
||
| Listener<ReqT> listener = new ServerCall.Listener<ReqT>() { | ||
| }; | ||
|
|
||
| try { | ||
| if (acquireResource) { | ||
| call.setMessageCompression(true); | ||
| ServerCall.Listener<ReqT> delegate = next.startCall(call, headers); | ||
|
|
||
| listener = new SimpleForwardingServerCallListener<ReqT>(delegate) { | ||
| @Override | ||
| public void onComplete() { | ||
| // must release the permit to avoid the leak of permit. | ||
| if (rateLimiter instanceof IPreemptibleRateLimiter) { | ||
| ((IPreemptibleRateLimiter) rateLimiter).release(); | ||
| } | ||
| call.setMessageCompression(true); | ||
| ServerCall.Listener<ReqT> delegate = next.startCall(call, headers); | ||
|
|
||
| listener = new SimpleForwardingServerCallListener<ReqT>(delegate) { | ||
| @Override | ||
| public void onComplete() { | ||
| // must release the permit to avoid the leak of permit. | ||
| if (rateLimiter instanceof IPreemptibleRateLimiter) { | ||
| ((IPreemptibleRateLimiter) rateLimiter).release(); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void onCancel() { | ||
| // must release the permit to avoid the leak of permit. | ||
| if (rateLimiter instanceof IPreemptibleRateLimiter) { | ||
| ((IPreemptibleRateLimiter) rateLimiter).release(); | ||
| } | ||
| @Override | ||
| public void onCancel() { | ||
| // must release the permit to avoid the leak of permit. | ||
| if (rateLimiter instanceof IPreemptibleRateLimiter) { | ||
| ((IPreemptibleRateLimiter) rateLimiter).release(); | ||
| } | ||
| }; | ||
| } else { | ||
| call.close(Status.fromCode(Code.RESOURCE_EXHAUSTED), new Metadata()); | ||
| } | ||
| } | ||
| }; | ||
| } catch (Exception e) { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [SHOULD] catch block on startCall failure should close the gRPC call When Suggestion: call |
||
| // next.startCall() failed — release the permit that was already acquired. | ||
| if (rateLimiter instanceof IPreemptibleRateLimiter) { | ||
| ((IPreemptibleRateLimiter) rateLimiter).release(); | ||
| } | ||
| String grpcFailMeterName = MetricsKey.NET_API_DETAIL_FAIL_QPS | ||
| + call.getMethodDescriptor().getFullMethodName(); | ||
| MetricsUtil.meterMark(MetricsKey.NET_API_FAIL_QPS); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,6 +4,6 @@ | |
|
|
||
| public interface IRateLimiter { | ||
|
|
||
| boolean acquire(RuntimeData data); | ||
| boolean tryAcquire(RuntimeData data); | ||
|
|
||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(commenting on line 98 because the issue lines 113 are outside the diff hunk)
[SHOULD] HTTP rejection should set 429 status, not implicit 200
The PR description says "Over-limit requests now return HTTP 429", but this branch only writes an error JSON body via
resp.getWriter().println(...)and never callsresp.setStatus(...). The HTTP status stays at the default 200, so client SDKs cannot back off based on status code, gateways and Prometheus templates classify rate-limited requests as success, and the behavior is asymmetric with the gRPC path that returns RESOURCE_EXHAUSTED.Suggestion: call
resp.setStatus(HttpServletResponse.SC_TOO_MANY_REQUESTS)here (and consider aRetry-Afterheader), or update the PR description to remove the 429 claim.