diff --git a/packages/forest_admin_agent/lib/forest_admin_agent/http/router.rb b/packages/forest_admin_agent/lib/forest_admin_agent/http/router.rb index ca62905e2..4bd0b896f 100644 --- a/packages/forest_admin_agent/lib/forest_admin_agent/http/router.rb +++ b/packages/forest_admin_agent/lib/forest_admin_agent/http/router.rb @@ -63,7 +63,8 @@ def self.routes { name: 'associate_related', handler: -> { Resources::Related::AssociateRelated.new.routes } }, { name: 'dissociate_related', handler: -> { Resources::Related::DissociateRelated.new.routes } }, { name: 'update_related', handler: -> { Resources::Related::UpdateRelated.new.routes } }, - { name: 'update_field', handler: -> { Resources::UpdateField.new.routes } } + { name: 'update_field', handler: -> { Resources::UpdateField.new.routes } }, + { name: 'workflow_executor_proxy', handler: -> { Workflow::WorkflowExecutorProxy.new.routes } } ] all_routes = {} diff --git a/packages/forest_admin_agent/lib/forest_admin_agent/routes/workflow/workflow_executor_proxy.rb b/packages/forest_admin_agent/lib/forest_admin_agent/routes/workflow/workflow_executor_proxy.rb new file mode 100644 index 000000000..e04e51317 --- /dev/null +++ b/packages/forest_admin_agent/lib/forest_admin_agent/routes/workflow/workflow_executor_proxy.rb @@ -0,0 +1,136 @@ +require 'faraday' + +module ForestAdminAgent + module Routes + module Workflow + # Forwards workflow-execution traffic from the agent to the workflow executor. + # Mounted only when the integrator sets `workflow_executor_url` + class WorkflowExecutorProxy < AbstractAuthenticatedRoute + AGENT_PREFIX = '/_internal/workflow-executions'.freeze + EXECUTOR_PREFIX = '/runs'.freeze + FORWARDED_HEADERS = %w[Authorization Cookie].freeze + ROUTING_KEYS = %w[run_id route_alias controller action format].freeze + + def setup_routes + return self unless executor_configured? + + add_route( + 'forest_workflow_run_show', + 'get', + "#{AGENT_PREFIX}/:run_id", + ->(args) { handle_request(:get, args) } + ) + add_route( + 'forest_workflow_run_trigger', + 'post', + "#{AGENT_PREFIX}/:run_id/trigger", + ->(args) { handle_request(:post, args) } + ) + + self + end + + def handle_request(method, args = {}) + build(args) + + base_url = configured_executor_url + run_id = args.dig(:params, 'run_id') || args.dig(:params, :run_id) + path = build_path(run_id, method) + response = forward(method, base_url, path, args) + + { + content: response.body, + status: response.status, + headers: forwarded_response_headers(response) + } + end + + private + + def executor_configured? + url = ForestAdminAgent::Facades::Container.config_from_cache[:workflow_executor_url] + !(url.nil? || url.to_s.strip.empty?) + rescue StandardError + # Container not yet populated (e.g. boot-order edge case): treat as disabled. + false + end + + def configured_executor_url + url = ForestAdminAgent::Facades::Container.config_from_cache[:workflow_executor_url] + if url.nil? || url.to_s.strip.empty? + raise Http::Exceptions::NotFoundError, 'Workflow executor proxy is not configured' + end + + url.to_s.sub(%r{/+\z}, '') + end + + def build_path(run_id, method) + suffix = method == :post ? '/trigger' : '' + "#{EXECUTOR_PREFIX}/#{run_id}#{suffix}" + end + + def forward(method, base_url, path, args) + query = forwarded_query_params(args[:params]) + headers = forwarded_request_headers(args[:headers]) + body = forwarded_body(method, args[:params]) + + client = build_client(base_url) + client.run_request(method, path, body, headers) do |req| + req.params.update(query) unless query.empty? + end + rescue Faraday::TimeoutError => e + raise Http::Exceptions::ServiceUnavailableError.new('Workflow executor timed out', cause: e) + rescue Faraday::ConnectionFailed => e + raise Http::Exceptions::ServiceUnavailableError.new('Workflow executor unreachable', cause: e) + end + + def build_client(base_url) + Faraday.new(url: base_url) do |f| + f.request :json + f.response :json, content_type: /\bjson$/ + f.adapter Faraday.default_adapter + end + end + + # Strip Rails-injected routing keys; keep only true client query params. + def forwarded_query_params(params) + return {} unless params.is_a?(Hash) + + params.each_with_object({}) do |(key, value), acc| + next if ROUTING_KEYS.include?(key.to_s) + next if value.is_a?(Hash) || value.is_a?(Array) # 'data' body, etc. + + acc[key.to_s] = value + end + end + + def forwarded_request_headers(headers) + return {} unless headers.is_a?(Hash) + + FORWARDED_HEADERS.each_with_object({}) do |name, acc| + value = headers[name] || headers[name.downcase] || headers["HTTP_#{name.upcase}"] + acc[name] = value if value && !value.to_s.empty? + end + end + + def forwarded_body(method, params) + return nil if method == :get + return nil unless params.is_a?(Hash) + + # JSON request bodies arrive parsed under :data when sent as JSON:API, + # or as the raw top-level params hash otherwise. Prefer :data when + # present; fall back to a sanitized copy of params. + body = params['data'] || params[:data] + return body if body + + params.reject { |key, _| ROUTING_KEYS.include?(key.to_s) } + end + + def forwarded_response_headers(response) + content_type = response.headers['content-type'] || response.headers['Content-Type'] + content_type ? { 'Content-Type' => content_type } : {} + end + end + end + end +end diff --git a/packages/forest_admin_agent/sig/forest_admin_agent/routes/abstract_route.rbs b/packages/forest_admin_agent/sig/forest_admin_agent/routes/abstract_route.rbs deleted file mode 100644 index 114e949e6..000000000 --- a/packages/forest_admin_agent/sig/forest_admin_agent/routes/abstract_route.rbs +++ /dev/null @@ -1,12 +0,0 @@ -module ForestAdminAgent - module Routes - class AbstractRoute - def initialize : -> void - def routes: -> {} - def add_route: (String, String,String, String) -> void - def setup: -> AbstractRoute - - def setup_routes: -> void - end - end -end diff --git a/packages/forest_admin_agent/spec/lib/forest_admin_agent/routes/workflow/workflow_executor_proxy_spec.rb b/packages/forest_admin_agent/spec/lib/forest_admin_agent/routes/workflow/workflow_executor_proxy_spec.rb new file mode 100644 index 000000000..c4d7aace6 --- /dev/null +++ b/packages/forest_admin_agent/spec/lib/forest_admin_agent/routes/workflow/workflow_executor_proxy_spec.rb @@ -0,0 +1,236 @@ +require 'spec_helper' +require 'faraday' + +module ForestAdminAgent + module Routes + module Workflow + describe WorkflowExecutorProxy do + include_context 'with caller' + + subject(:proxy) { described_class.new } + + let(:executor_url) { 'http://workflow-executor.test:4001' } + let(:run_id) { 'run_abc123' } + let(:headers) do + { + 'HTTP_AUTHORIZATION' => bearer, + 'Authorization' => bearer, + 'Cookie' => 'forest_session_token=abc', + 'forest-secret-key' => 'should-not-be-forwarded' + } + end + + # Capture-based Faraday stubbing: store the request that hit `run_request` + # so each test can assert exact url/method/body/headers/params. + let(:captured) { {} } + let(:fake_response) do + instance_double( + Faraday::Response, + status: 200, + body: { 'id' => run_id, 'state' => 'pending' }, + headers: { 'content-type' => 'application/json' } + ) + end + + def override_config(extra) + container = ForestAdminAgent::Builder::AgentFactory.instance.container + existing = container.resolve(:config) + container._container.delete('config') + container.register(:config, existing.merge(extra)) + end + + before do + override_config(workflow_executor_url: executor_url) + + # Skip the real JWT/permissions resolution: we test proxy behavior, not + # the auth chain itself (covered by AbstractAuthenticatedRoute specs). + allow(proxy).to receive(:build).and_return(nil) + + faraday_request_class = Struct.new(:params) + captured_state = captured + response_for_request = fake_response + + allow(Faraday).to receive(:new).and_wrap_original do |original, *args, &block| + connection = original.call(*args, &block) + allow(connection).to receive(:run_request) do |method, path, body, hdrs, &req_block| + captured_params = {} + req_block&.call(faraday_request_class.new(captured_params)) + captured_state.merge!( + method: method, + url: "#{args.first[:url]}#{path}", + body: body, + headers: hdrs, + query: captured_params.dup + ) + response_for_request + end + connection + end + end + + describe '#setup_routes' do + it 'registers the GET /_internal/workflow-executions/:run_id route' do + proxy.setup_routes + route = proxy.routes['forest_workflow_run_show'] + expect(route).to include( + method: 'get', + uri: '/_internal/workflow-executions/:run_id' + ) + end + + it 'registers the POST /_internal/workflow-executions/:run_id/trigger route' do + proxy.setup_routes + route = proxy.routes['forest_workflow_run_trigger'] + expect(route).to include( + method: 'post', + uri: '/_internal/workflow-executions/:run_id/trigger' + ) + end + + context 'when workflow_executor_url is nil' do + before { override_config(workflow_executor_url: nil) } + + it 'does not register any route (so they are absent from rails routes)' do + expect(described_class.new.routes).to be_empty + end + end + + context 'when workflow_executor_url is blank' do + before { override_config(workflow_executor_url: ' ') } + + it 'does not register any route' do + expect(described_class.new.routes).to be_empty + end + end + end + + describe '#handle_request (GET)' do + let(:args) do + { + headers: headers, + params: { 'run_id' => run_id, 'page' => '2', 'route_alias' => 'forest_workflow_run_show' } + } + end + + it 'forwards GET to the executor /runs/:run_id path' do + proxy.handle_request(:get, args) + expect(captured[:method]).to eq(:get) + expect(captured[:url]).to eq("#{executor_url}/runs/#{run_id}") + end + + it 'forwards client query params and drops Rails routing keys' do + proxy.handle_request(:get, args) + expect(captured[:query]).to eq('page' => '2') + end + + it 'forwards only Authorization and Cookie headers' do + proxy.handle_request(:get, args) + expect(captured[:headers]).to eq( + 'Authorization' => bearer, + 'Cookie' => 'forest_session_token=abc' + ) + end + + it 'returns the executor status, body and Content-Type to the controller' do + result = proxy.handle_request(:get, args) + expect(result).to eq( + content: { 'id' => run_id, 'state' => 'pending' }, + status: 200, + headers: { 'Content-Type' => 'application/json' } + ) + end + end + + describe '#handle_request (POST trigger)' do + let(:args) do + { + headers: headers, + params: { + 'run_id' => run_id, + 'data' => { 'step' => 'approve', 'value' => 42 } + } + } + end + + it 'forwards POST to the executor /runs/:run_id/trigger path with the JSON body' do + proxy.handle_request(:post, args) + expect(captured[:method]).to eq(:post) + expect(captured[:url]).to eq("#{executor_url}/runs/#{run_id}/trigger") + expect(captured[:body]).to eq('step' => 'approve', 'value' => 42) + end + end + + describe 'when workflow_executor_url is not configured' do + before do + override_config(workflow_executor_url: nil) + end + + it 'raises NotFoundError so the controller renders 404' do + expect do + proxy.handle_request(:get, headers: headers, params: { 'run_id' => run_id }) + end.to raise_error(Http::Exceptions::NotFoundError, /not configured/) + end + end + + describe 'when the executor is unreachable' do + before do + allow(Faraday).to receive(:new).and_wrap_original do |original, *args, &block| + connection = original.call(*args, &block) + allow(connection).to receive(:run_request).and_raise( + Faraday::ConnectionFailed.new('boom') + ) + connection + end + end + + it 'raises ServiceUnavailableError (translated to 503 by ErrorTranslator)' do + expect do + proxy.handle_request(:get, headers: headers, params: { 'run_id' => run_id }) + end.to raise_error(Http::Exceptions::ServiceUnavailableError, /unreachable/) + end + end + + describe 'when the executor times out' do + before do + allow(Faraday).to receive(:new).and_wrap_original do |original, *args, &block| + connection = original.call(*args, &block) + allow(connection).to receive(:run_request).and_raise( + Faraday::TimeoutError.new('slow') + ) + connection + end + end + + it 'raises ServiceUnavailableError' do + expect do + proxy.handle_request(:get, headers: headers, params: { 'run_id' => run_id }) + end.to raise_error(Http::Exceptions::ServiceUnavailableError, /timed out/) + end + end + + describe 'when the executor returns a non-2xx response' do + let(:fake_response) do + instance_double( + Faraday::Response, + status: 422, + body: { 'error' => 'invalid step' }, + headers: { 'content-type' => 'application/json' } + ) + end + + it 'forwards the status and body verbatim' do + result = proxy.handle_request(:get, headers: headers, params: { 'run_id' => run_id }) + expect(result[:status]).to eq(422) + expect(result[:content]).to eq('error' => 'invalid step') + end + end + + describe 'class hierarchy' do + it 'inherits from AbstractAuthenticatedRoute (so JWT auth runs before forwarding)' do + expect(described_class.ancestors).to include(AbstractAuthenticatedRoute) + end + end + end + end + end +end diff --git a/packages/forest_admin_rails/lib/forest_admin_rails.rb b/packages/forest_admin_rails/lib/forest_admin_rails.rb index 5fb0b5037..2606f91be 100644 --- a/packages/forest_admin_rails/lib/forest_admin_rails.rb +++ b/packages/forest_admin_rails/lib/forest_admin_rails.rb @@ -33,6 +33,7 @@ module ForestAdminRails setting :skip_schema_update, default: false setting :disable_route_cache, default: false setting :rpc_max_polling_threads, default: nil + setting :workflow_executor_url, default: nil if defined?(Rails::Railtie) # logic for cors middleware,... here // or it might be into Engine