From 2d5e0284b60f232a8a1e056914810ca441d13dd9 Mon Sep 17 00:00:00 2001 From: Codyjackson0321 Date: Fri, 20 Mar 2026 23:33:03 -0400 Subject: [PATCH 001/141] Initial LogstashAgent setup feat: Added logstashagent.example.yml feat: Added agent_policies html and js feat: Updated connection modal to work with enrolling Logstash agents feat: Enabled Logstash Agent button refactor: segment Connection/Pipeline Manager views into manager_views / editor_views since they share the same app refactor: segmented tests --- LogstashAgent/logstashagent.example.yml | 13 + LogstashAgent/logstashagent.yml | 11 +- LogstashUI/PipelineManager/editor_views.py | 433 ++++++++++ LogstashUI/PipelineManager/manager_views.py | 508 ++++++++++++ .../static/js/agent_policies.js | 0 .../pipeline_manager/agent_policies.html | 10 + .../pipeline_manager/connection_modal.html | 112 ++- .../templates/pipeline_manager.html | 22 +- .../tests/test_editor_views.py | 751 ++++++++++++++++++ .../tests/test_manager_views.py | 735 +++++++++++++++++ LogstashUI/PipelineManager/urls.py | 47 +- 11 files changed, 2593 insertions(+), 49 deletions(-) create mode 100644 LogstashAgent/logstashagent.example.yml create mode 100644 LogstashUI/PipelineManager/editor_views.py create mode 100644 LogstashUI/PipelineManager/manager_views.py create mode 100644 LogstashUI/PipelineManager/static/js/agent_policies.js create mode 100644 LogstashUI/PipelineManager/templates/components/pipeline_manager/agent_policies.html create mode 100644 LogstashUI/PipelineManager/tests/test_editor_views.py create mode 100644 LogstashUI/PipelineManager/tests/test_manager_views.py diff --git a/LogstashAgent/logstashagent.example.yml b/LogstashAgent/logstashagent.example.yml new file mode 100644 index 00000000..9a09dc83 --- /dev/null +++ b/LogstashAgent/logstashagent.example.yml @@ -0,0 +1,13 @@ + +# Mode determines if the agent is used for simulating +# in LogstashUI, or for controlling an actual Logstash host +mode: simulation # simulation | host + +# Only applies if mode 'mode' is set to simulation +simulation_mode: embedded # embedded | host + + +# Linux example, default +logstash_binary: /usr/share/logstash/bin/logstash +logstash_settings: /etc/logstash +logstash_log_path: /var/log/logstash \ No newline at end of file diff --git a/LogstashAgent/logstashagent.yml b/LogstashAgent/logstashagent.yml index 4a1f3241..12b0dad4 100644 --- a/LogstashAgent/logstashagent.yml +++ b/LogstashAgent/logstashagent.yml @@ -1,5 +1,12 @@ -mode: simulation -simulation_mode: embedded + +# Mode determines if the agent is used for simulating +# in LogstashUI, or for controlling an actual Logstash host +mode: host # simulation | host + +# Only applies if mode 'mode' is set to simulation +simulation_mode: embedded # embedded | host + + # Linux example, default logstash_binary: /usr/share/logstash/bin/logstash logstash_settings: /etc/logstash diff --git a/LogstashUI/PipelineManager/editor_views.py b/LogstashUI/PipelineManager/editor_views.py new file mode 100644 index 00000000..ee430ee6 --- /dev/null +++ b/LogstashUI/PipelineManager/editor_views.py @@ -0,0 +1,433 @@ +#Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +#or more contributor license agreements. Licensed under the Elastic License; +#you may not use this file except in compliance with the Elastic License. + +from django.shortcuts import render +from django.http import HttpResponse, JsonResponse, HttpResponseBadRequest +from django.conf import settings + +from Common.decorators import require_admin_role +from Common.logstash_utils import get_logstash_pipeline +from Common.elastic_utils import get_elastic_connection, query_elasticsearch_documents, \ + get_elastic_connections_from_list, get_elasticsearch_indices, get_elasticsearch_field_mappings +from Common.validators import validate_pipeline_name +from Common import logstash_config_parse + +from datetime import datetime, timezone +from html import escape + +import json +import os +import logging +import traceback +import difflib + +logger = logging.getLogger(__name__) + + +def _load_plugin_data(): + app_dir = os.path.dirname(os.path.abspath(__file__)) + json_path = os.path.join(app_dir, 'data', 'plugins.json') + with open(json_path, 'r', encoding='utf-8') as f: + data = json.load(f) + return data + + +def PipelineEditor(request): + from django.conf import settings + + context = { + "plugin_data": _load_plugin_data(), + "simulation_mode": settings.LOGSTASHUI_CONFIG.get('simulation', {}).get('mode', 'embedded') + } + + if request.method == "GET": + es_id = request.GET.get("es_id") + pipeline_name = request.GET.get("pipeline") + + # Validate required parameters + if not es_id or not pipeline_name: + return HttpResponseBadRequest("Missing required parameters: es_id and pipeline") + + pipeline_config = get_logstash_pipeline(es_id, pipeline_name) + + # Handle case where pipeline couldn't be fetched + if not pipeline_config: + return HttpResponseBadRequest(f"Could not fetch pipeline '{pipeline_name}' from connection {es_id}") + + context['pipeline_text'] = pipeline_config['pipeline'] + + # Flatten pipeline settings with defaults for template access + settings = pipeline_config.get('pipeline_settings', {}) + context['pipeline_settings'] = { + 'description': pipeline_config.get('description', ''), + 'pipeline_workers': settings.get('pipeline.workers', 1), + 'pipeline_batch_size': settings.get('pipeline.batch.size', 128), + 'pipeline_batch_delay': settings.get('pipeline.batch.delay', 50), + 'queue_type': settings.get('queue.type', 'memory'), + 'queue_max_bytes': settings.get('queue.max_bytes', '1gb'), + 'queue_checkpoint_writes': settings.get('queue.checkpoint.writes', 1024), + } + context['pipeline_name'] = pipeline_name + context['parsing_error'] = None + + try: + parsed_config = logstash_config_parse.logstash_config_to_components(context['pipeline_text']) + # logstash_config_to_components returns a JSON string, so parse it back to a dict + context['component_data'] = json.loads(parsed_config) + except Exception as e: + # Capture the parsing error to show to the user + context['parsing_error'] = str(e) + context['component_data'] = { + "input": [], + "filter": [], + "output": [] + } + + return render(request, "pipeline_editor.html", context=context) + + +def GetCurrentPipelineCode(request, components={}): + if not components: + data = json.loads(request.POST.get("components")) + else: + data = components + parser = logstash_config_parse.ComponentToPipeline(data) + config = parser.components_to_logstash_config() + + # Return the code wrapped in a pre tag with proper formatting + return HttpResponse( + f'
{escape(config)}
', + content_type="text/html" + ) + + +@require_admin_role +def SavePipeline(request): + if "save_pipeline" in request.POST: + pipeline_name = request.POST.get("pipeline") + + # Validate pipeline name + is_valid, error_msg = validate_pipeline_name(pipeline_name) + if not is_valid: + return HttpResponse( + f'

{error_msg}

', + status=400 + ) + + # Check if we have raw pipeline config (from Text mode) or components (from UI mode) + pipeline_config = request.POST.get("pipeline_config") + + if pipeline_config: + # Use the raw pipeline config directly from Text mode + config = pipeline_config + logger.info(f"Saving pipeline from Text mode (raw config)") + else: + # Generate config from components (UI mode) + components_json = request.POST.get("components") + if not components_json: + return HttpResponse( + f'

Missing pipeline configuration

', + status=400 + ) + + data = json.loads(components_json) + add_ids = request.POST.get("add_ids", "false").lower() == "true" + parser = logstash_config_parse.ComponentToPipeline(data, add_ids=add_ids) + config = parser.components_to_logstash_config() + + # Validate that the generated config can be converted back to components + try: + logstash_config_parse.logstash_config_to_components(config) + except Exception as e: + # If conversion fails, return detailed error to user + error_message = escape(str(e)) + return HttpResponse( + f"""
+

We're sorry! Something went wrong in the conversion of your pipeline!

+
+{error_message} +
+

Please report this issue to us so we can fix it!!

+
""", + status=400 + ) + + es = get_elastic_connection(request.POST.get("es_id")) + current_pipeline_config = es.logstash.get_pipeline(id=pipeline_name) + + pipeline_data = current_pipeline_config.get(pipeline_name, {}) + + es.logstash.put_pipeline(id=pipeline_name, body={ + "pipeline": config, + "last_modified": datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z', + "pipeline_metadata": pipeline_data.get('pipeline_metadata', {"version": 1, "type": "logstash_pipeline"}), + "username": "LogstashUI", + "pipeline_settings": pipeline_data.get('pipeline_settings', {}), + "description": pipeline_data.get('description', '') + } + ) + + logger.info( + f"User '{request.user.username}' saved pipeline '{pipeline_name}' (Connection ID: {request.POST.get('es_id')})") + return HttpResponse("Pipeline saved successfully!") + + return HttpResponse("Invalid request", status=400) + + +def ComponentsToConfig(request): + """Convert components JSON to Logstash configuration text""" + if request.method == "POST": + try: + components_json = request.POST.get("components") + if not components_json: + return HttpResponse("No components provided", status=400) + + # Parse components + components = json.loads(components_json) + + # Convert to config using the same logic as SavePipeline + parser = logstash_config_parse.ComponentToPipeline(components, add_ids=False) + config = parser.components_to_logstash_config() + + # Return plain text config + return HttpResponse(config, content_type="text/plain") + except Exception as e: + logger.error(f"Error converting components to config: {str(e)}") + return HttpResponse(f"Error: {str(e)}", status=500) + + return HttpResponse("Method not allowed", status=405) + + +def ConfigToComponents(request): + """Convert Logstash configuration text to components JSON""" + if request.method == "POST": + try: + config_text = request.POST.get("config_text") + if not config_text: + return JsonResponse({"error": "No config text provided"}, status=400) + + # Parse config text to components + components = logstash_config_parse.logstash_config_to_components(config_text) + + # Return components as JSON with safe=False to allow nested structures + return JsonResponse(components, safe=False) + except Exception as e: + logger.error(f"Error converting config to components: {str(e)}") + return JsonResponse({"error": str(e)}, status=500) + + return JsonResponse({"error": "Method not allowed"}, status=405) + + +def GetDiff(request): + """Generate a unified diff between current and new pipeline configurations""" + if request.method == "POST": + es_id = request.POST.get("es_id") + pipeline_name = request.POST.get("pipeline") + pipeline_text = request.POST.get("pipeline_text") # Raw text from Text mode + components_json = request.POST.get("components") + add_ids = request.POST.get("add_ids", "false").lower() == "true" + + # Need either pipeline_text or components + if not es_id or not pipeline_name or (not pipeline_text and not components_json): + return JsonResponse({"error": "Missing required parameters"}, status=400) + + try: + # Get the current pipeline from Elasticsearch + current_pipeline = get_logstash_pipeline(es_id, pipeline_name)['pipeline'] + + # Generate the new pipeline - either from raw text or from components + if pipeline_text: + # Use the raw text directly from Text mode + new_pipeline = pipeline_text + else: + # Generate from components (UI mode) + components = json.loads(components_json) + parser = logstash_config_parse.ComponentToPipeline(components, add_ids=add_ids) + new_pipeline = parser.components_to_logstash_config() + + # Generate unified diff + diff = difflib.unified_diff( + current_pipeline.splitlines(keepends=True), + new_pipeline.splitlines(keepends=True), + fromfile='Current Pipeline', + tofile='New Pipeline (After Save)', + lineterm='' + ) + + # Convert to string + diff_text = ''.join(diff) + + # Calculate stats + current_lines = len(current_pipeline.splitlines()) + new_lines = len(new_pipeline.splitlines()) + line_diff = new_lines - current_lines + diff_sign = '+' if line_diff > 0 else '' + stats = f"Current: {current_lines} lines | New: {new_lines} lines ({diff_sign}{line_diff})" + + return JsonResponse({ + 'diff': diff_text, + 'stats': stats, + 'current': current_pipeline, + 'new': new_pipeline + }) + + except Exception as e: + logger.error(f"Error generating diff: {str(e)}") + return JsonResponse({"error": f"Error generating diff: {str(e)}"}, status=500) + + return JsonResponse({"error": "Method not allowed"}, status=405) + + +def GetElasticsearchConnections(request): + """ + Get all Elasticsearch connections for simulation input + """ + try: + # Use existing function that returns connections with ES clients + connections_list = get_elastic_connections_from_list() + + # Format for dropdown: extract id and name + connections = [{'id': conn['id'], 'name': conn['name']} for conn in connections_list] + + return JsonResponse({"connections": connections}) + except Exception as e: + logger.error(f"Error fetching Elasticsearch connections: {e}") + return JsonResponse({"error": str(e)}, status=500) + + +def GetElasticsearchIndices(request): + """ + Get Elasticsearch indices with typeahead support + """ + + connection_id = request.GET.get("connection_id") + pattern = request.GET.get("pattern", "*") + + if not connection_id: + return JsonResponse({"error": "connection_id is required"}, status=400) + + try: + indices = get_elasticsearch_indices(connection_id, pattern) + return JsonResponse({"indices": indices}) + except Exception as e: + logger.error(f"Error fetching Elasticsearch indices: {e}") + return JsonResponse({"error": str(e)}, status=500) + + +def GetElasticsearchFields(request): + """ + Get field mappings from an Elasticsearch index + """ + + connection_id = request.GET.get("connection_id") + index = request.GET.get("index") + + if not connection_id or not index: + return JsonResponse({"error": "connection_id and index are required"}, status=400) + + try: + fields = get_elasticsearch_field_mappings(connection_id, index) + return JsonResponse({"fields": fields}) + except Exception as e: + logger.error(f"Error fetching Elasticsearch fields: {e}") + return JsonResponse({"error": str(e)}, status=500) + + +def QueryElasticsearchDocuments(request): + """ + Query Elasticsearch documents for simulation + """ + + connection_id = request.POST.get("connection_id") + index = request.POST.get("index") + query_method = request.POST.get("query_method") # 'field' or 'docid' + + if not connection_id or not index: + return JsonResponse({"error": "connection_id and index are required"}, status=400) + + try: + if query_method == "docid": + doc_ids = request.POST.get("doc_ids", "").strip().split("\n") + doc_ids = [d.strip() for d in doc_ids if d.strip()] + documents = query_elasticsearch_documents(connection_id, index, doc_ids=doc_ids) + elif query_method == "entire": + # Entire document - fetch with all fields + size = int(request.POST.get("size", 10)) + query = request.POST.get("query", "") + documents = query_elasticsearch_documents( + connection_id, index, field=None, size=size, query_string=query + ) + else: # field method + field = request.POST.get("field") + size = int(request.POST.get("size", 10)) + query = request.POST.get("query", "") + + if not field: + return JsonResponse({"error": "field is required for field-based queries"}, status=400) + + documents = query_elasticsearch_documents( + connection_id, index, field=field, size=size, query_string=query + ) + + return JsonResponse({"documents": documents}) + except Exception as e: + logger.error(f"Error querying Elasticsearch documents: {e}") + return JsonResponse({"error": str(e)}, status=500) + + +def GetPluginDocumentation(request): + """ + Securely proxy plugin documentation URLs with allowlist validation. + Only allows documentation from trusted Elastic/Logstash domains. + """ + plugin_type = request.GET.get("type") + plugin_name = request.GET.get("name") + + if not plugin_type or not plugin_name: + return JsonResponse({"error": "type and name are required"}, status=400) + + try: + # Load plugin data to get the documentation URL + plugin_data = _load_plugin_data() + + if plugin_type not in plugin_data: + return JsonResponse({"error": f"Invalid plugin type: {plugin_type}"}, status=400) + + if plugin_name not in plugin_data[plugin_type]: + return JsonResponse({"error": f"Plugin not found: {plugin_name}"}, status=404) + + plugin = plugin_data[plugin_type][plugin_name] + doc_url = plugin.get("link") + + if not doc_url: + return JsonResponse({"error": "No documentation URL available for this plugin"}, status=404) + + # Allowlist of trusted documentation domains + ALLOWED_DOC_DOMAINS = [ + "www.elastic.co", + "elastic.co", + "github.com", + "rubydoc.info" + ] + + # Parse and validate the URL + from urllib.parse import urlparse + parsed_url = urlparse(doc_url) + + # Check if domain is in allowlist + if not any(parsed_url.netloc.endswith(domain) or parsed_url.netloc == domain + for domain in ALLOWED_DOC_DOMAINS): + logger.warning(f"Blocked documentation URL from untrusted domain: {doc_url}") + return JsonResponse({"error": "Documentation URL is not from a trusted domain"}, status=403) + + # Return the validated URL (frontend will use it in iframe) + return JsonResponse({ + "url": doc_url, + "plugin_name": plugin_name, + "plugin_type": plugin_type + }) + + except Exception as e: + logger.error(f"Error fetching plugin documentation: {e}") + return JsonResponse({"error": str(e)}, status=500) diff --git a/LogstashUI/PipelineManager/manager_views.py b/LogstashUI/PipelineManager/manager_views.py new file mode 100644 index 00000000..f274a126 --- /dev/null +++ b/LogstashUI/PipelineManager/manager_views.py @@ -0,0 +1,508 @@ +#Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +#or more contributor license agreements. Licensed under the Elastic License; +#you may not use this file except in compliance with the Elastic License. + +from django.shortcuts import render +from django.http import HttpResponse, JsonResponse +from django.template.loader import get_template +from django.conf import settings + +from .forms import ConnectionForm +from PipelineManager.models import Connection as ConnectionTable + +from Common.decorators import require_admin_role +from Common.logstash_utils import get_logstash_pipeline +from Common.elastic_utils import get_elastic_connection, test_elastic_connectivity +from Common.validators import validate_pipeline_name + +from datetime import datetime, timezone +from html import escape + +import logging +import requests + +logger = logging.getLogger(__name__) + + +@require_admin_role +def AgentPolicies(request): + """ + View for managing Logstash Agent Policies + """ + context = {} + return render(request, "components/pipeline_manager/agent_policies.html", context=context) + + +def PipelineManager(request): + """Builds the table of pipelines""" + context = {} + connections = list(ConnectionTable.objects.values("connection_type", "name", "host", "cloud_id", "cloud_url", "pk")) + + context['connections'] = connections + context['has_connections'] = len(connections) > 0 + context['form'] = ConnectionForm() + + return render(request, "pipeline_manager.html", context=context) + + +def test_connectivity(connection_id): + """ + Test connectivity to an Elasticsearch connection. + Pure Python function for programmatic use. + + Args: + connection_id: ID of the connection to test + + Returns: + tuple: (success: bool, message: str) + """ + if not connection_id: + return (False, "No connection ID provided") + + try: + elastic_connection = get_elastic_connection(connection_id) + result = test_elastic_connectivity(elastic_connection) + return (True, result) + except Exception as e: + error_msg = str(e) + logger.error(f"Connection test against {connection_id} failed: {error_msg}") + return (False, error_msg) + + +def TestConnectivity(request): + """ + Django view to test connectivity to an Elasticsearch connection. + Returns HTML response for HTMX. + """ + test_id = request.GET.get('test') + + if not test_id: + return HttpResponse("No connection ID provided", status=400) + + logger.info(f"User '{request.user.username}' testing connection {test_id}") + success, message = test_connectivity(test_id) + + if success: + return HttpResponse(""" +
+

{0}

+
+ """.format(escape(str(message)))) + else: + return HttpResponse(""" +
+

Connection failed: {0}

+
+ """.format(escape(str(message)))) + + +def GetConnections(request): + """Get all connections for dropdown population""" + try: + connections = ConnectionTable.objects.all().values('id', 'name', 'connection_type') + return JsonResponse(list(connections), safe=False, status=200) + except Exception as e: + return JsonResponse({'error': str(e)}, status=500) + + +@require_admin_role +def AddConnection(request): + if request.method == "POST": + + form = ConnectionForm(request.POST) + + if form.is_valid(): + # Save the connection temporarily + new_connection = form.save() + + # Test the connection + success, message = test_connectivity(new_connection.id) + + if not success: + # If test fails, delete the connection and return JSON error + new_connection.delete() + logger.error(f"User '{request.user.username}' failed to add connection, {new_connection.id}") + + return JsonResponse({ + 'success': False, + 'error': str(message) + }, status=200) + + # Connection test succeeded, return JSON response + logger.info(f"User '{request.user.username}' added a new connection, {new_connection.id}") + logger.info(f"Returning success response with connection ID: {new_connection.id}") + return JsonResponse({ + 'success': True, + 'connection_id': new_connection.id, + 'message': 'Connection created and tested successfully!' + }, status=200) + else: + logger.warning(f"User '{request.user.username}' failed to add connection: {form.errors}") + return JsonResponse({ + 'success': False, + 'error': str(form.errors) + }, status=200) + + return JsonResponse({'error': 'Invalid request method'}, status=405) + + +@require_admin_role +def DeleteConnection(request, connection_id=None): + if request.method != "POST": + return HttpResponse("Method not allowed", status=405) + + if not connection_id: + return HttpResponse("Connection ID is required", status=400) + + connection = ConnectionTable.objects.filter(id=connection_id).first() + if not connection: + return HttpResponse( + '
Connection not found
', + status=404 + ) + + connection_name = connection.name + connection.delete() + logger.warning( + f"User '{request.user.username}' deleted connection '{connection_name}' (ID: {connection_id})") + + return HttpResponse(""" + + """) + + +def GetPipelines(request, connection_id): + context = {} + try: + connection = ConnectionTable.objects.get(pk=connection_id) + except ConnectionTable.DoesNotExist: + return HttpResponse( + '
Connection not found
', + status=404 + ) + + logstash_pipelines = [] + if connection.connection_type == "CENTRALIZED": + # --- Gets our pipelines from the connection + try: + es = get_elastic_connection(connection.id) + pipelines = es.logstash.get_pipeline() + + for pipeline_name, pipeline_data in pipelines.items(): + # Format last_modified timestamp + last_modified_str = pipeline_data.get("last_modified", "") + formatted_date = "" + if last_modified_str: + try: + # Parse ISO 8601 format: 2025-11-23T05:30:52.421Z + dt = datetime.fromisoformat(last_modified_str.replace('Z', '+00:00')) + # Format as "Tuesday, January 14th 2025" + day = dt.day + suffix = 'th' if 11 <= day <= 13 else {1: 'st', 2: 'nd', 3: 'rd'}.get(day % 10, 'th') + formatted_date = dt.strftime(f'%A, %B {day}{suffix} %Y') + except Exception: + formatted_date = last_modified_str # Fallback to original if parsing fails + + logstash_pipelines.append( + { + "es_id": connection.id, + "es_name": connection.name, + "name": pipeline_name, + "description": pipeline_data.get("description", ""), + "last_modified": formatted_date + } + ) + + except Exception as e: + logger.exception("Couldn't connect to Elastic") + + context['pipelines'] = logstash_pipelines + context['es_id'] = connection.id + + logstash_template = get_template("components/pipeline_manager/collapsible_row.html") + html = logstash_template.render(context) + return HttpResponse(html) + + +@require_admin_role +def UpdatePipelineSettings(request): + if request.method == "POST": + try: + es_id = request.POST.get("es_id") + pipeline_name = request.POST.get("pipeline") + + # Validate required fields + if not es_id or not pipeline_name: + return HttpResponse( + '
Error: Missing pipeline ID or connection ID
', + status=400 + ) + + # Validate pipeline name + is_valid, error_msg = validate_pipeline_name(pipeline_name) + if not is_valid: + return HttpResponse( + f'
{error_msg}
', + status=400 + ) + + # Get form values + description = request.POST.get("description", "") + pipeline_workers = request.POST.get("pipeline_workers") + pipeline_batch_size = request.POST.get("pipeline_batch_size") + pipeline_batch_delay = request.POST.get("pipeline_batch_delay") + queue_type = request.POST.get("queue_type") + queue_max_bytes = request.POST.get("queue_max_bytes") + queue_max_bytes_unit = request.POST.get("queue_max_bytes_unit") + queue_checkpoint_writes = request.POST.get("queue_checkpoint_writes") + + # Build settings body - only include non-empty values + current_pipeline_config = get_logstash_pipeline(es_id, pipeline_name) + settings_body = { + "pipeline": current_pipeline_config['pipeline'], + "last_modified": datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z', + "pipeline_metadata": { + "version": current_pipeline_config['pipeline_metadata']['version'] + 1, + "type": "logstash_pipeline" + }, + "username": "LogstashUI", + "pipeline_settings": {}, + + } + + if 'description' in current_pipeline_config: + settings_body['description'] = current_pipeline_config['description'] + + if description: + settings_body["description"] = description + if pipeline_workers: + settings_body['pipeline_settings']["pipeline.workers"] = int(pipeline_workers) + if pipeline_batch_size: + settings_body['pipeline_settings']["pipeline.batch.size"] = int(pipeline_batch_size) + if pipeline_batch_delay: + settings_body['pipeline_settings']["pipeline.batch.delay"] = int(pipeline_batch_delay) + if queue_type: + settings_body['pipeline_settings']["queue.type"] = queue_type + if queue_max_bytes: + settings_body['pipeline_settings']["queue.max_bytes"] = f"{queue_max_bytes}{queue_max_bytes_unit}" + if queue_checkpoint_writes: + settings_body['pipeline_settings']["queue.checkpoint.writes"] = int(queue_checkpoint_writes) + + # Get Elasticsearch connection and update pipeline settings + es = get_elastic_connection(es_id) + es.logstash.put_pipeline(id=pipeline_name, body=settings_body) + + logger.info( + f"User '{request.user.username}' updated settings for pipeline '{pipeline_name}' (Connection ID: {es_id})") + # Return empty response - toast notification handled by JavaScript + return HttpResponse('', status=200) + + except Exception as e: + # Return simple error message - toast notification handled by JavaScript + logger.error(f"Error updating pipeline settings: {str(e)}") + return HttpResponse(str(e), status=500) + + return HttpResponse('Invalid request method', status=405) + + +@require_admin_role +def CreatePipeline(request, simulate=False, pipeline_name=None, pipeline_config=None): + """ + Create a pipeline in Elasticsearch or LogstashAgent. + + Args: + request: Django request object + simulate: If True, send to LogstashAgent instead of Elasticsearch + pipeline_name: Pipeline name (used when called directly for simulation) + pipeline_config: Pipeline config string (used when called directly for simulation) + """ + + if request.method == "POST" or simulate: + # Get parameters from POST or function arguments + if not simulate: + es_id = request.POST.get("es_id") + pipeline_name = request.POST.get("pipeline") + pipeline_config = request.POST.get("pipeline_config", "").strip() + + # Validate pipeline name + is_valid, error_msg = validate_pipeline_name(pipeline_name) + if not is_valid: + return HttpResponse(error_msg, status=400) + + # Use provided config or default empty config + if pipeline_config: + pipeline_content = pipeline_config + else: + pipeline_content = "input {}\nfilter {}\noutput {}" + + # Build the pipeline body + pipeline_body = { + "pipeline": pipeline_content, + "last_modified": datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z', + "pipeline_metadata": { + "version": 1, + "type": "logstash_pipeline" + }, + "username": "LogstashUI", + "pipeline_settings": { + "pipeline.batch.delay": 50, + "pipeline.batch.size": 125, + "pipeline.workers": 1, + "queue.checkpoint.writes": 1024, + "queue.max_bytes": "1gb", + "queue.type": "memory" + }, + "description": "" + } + + if simulate: + # Send to LogstashAgent + logstash_agent_url = f"{settings.LOGSTASH_AGENT_URL}/_logstash/pipeline/{pipeline_name}" + + try: + response = requests.put( + logstash_agent_url, + json=pipeline_body, + verify=False, # --insecure equivalent + timeout=10 + ) + response.raise_for_status() + logger.info( + f"User '{request.user.username}' created simulation pipeline '{pipeline_name}' in LogstashAgent") + return HttpResponse("Simulation pipeline created successfully!", status=200) + except requests.exceptions.RequestException as e: + logger.error(f"Failed to create simulation pipeline in LogstashAgent: {e}") + return HttpResponse(f"Failed to create simulation pipeline: {str(e)}", status=500) + else: + # Send to Elasticsearch + es = get_elastic_connection(es_id) + pipeline_doc = es.logstash.put_pipeline( + id=pipeline_name, + body=pipeline_body + ) + + logger.info( + f"User '{request.user.username}' created new pipeline '{pipeline_name}' (Connection ID: {es_id})") + response = HttpResponse("Pipeline created successfully!") + response['HX-Redirect'] = f'/ConnectionManager/Pipelines/Editor/?es_id={es_id}&pipeline={pipeline_name}' + return response + + +@require_admin_role +def DeletePipeline(request): + if request.method == "POST": + # Handle both JSON and form data + import json + if request.content_type == 'application/json': + data = json.loads(request.body) + es_id = data.get("es_id") + pipeline_name = data.get("pipeline") + else: + es_id = request.POST.get("es_id") + pipeline_name = request.POST.get("pipeline") + + # Validate pipeline name + is_valid, error_msg = validate_pipeline_name(pipeline_name) + if not is_valid: + return HttpResponse(error_msg, status=400) + + es = get_elastic_connection(es_id) + es.logstash.delete_pipeline(id=pipeline_name) + + logger.warning(f"User '{request.user.username}' deleted pipeline '{pipeline_name}' (Connection ID: {es_id})") + return HttpResponse(status=204) # No content - prevents text from being inserted into page + + +@require_admin_role +def ClonePipeline(request): + if request.method == "POST": + es_id = request.POST.get("es_id") + source_pipeline = request.POST.get("source_pipeline") + new_pipeline = request.POST.get("new_pipeline") + + # Validate source pipeline name + is_valid, error_msg = validate_pipeline_name(source_pipeline) + if not is_valid: + return HttpResponse(f"Invalid source pipeline name: {error_msg}", status=400) + + # Validate new pipeline name + is_valid, error_msg = validate_pipeline_name(new_pipeline) + if not is_valid: + return HttpResponse(error_msg, status=400) + + try: + es = get_elastic_connection(es_id) + + # Get the source pipeline configuration + source_config = es.logstash.get_pipeline(id=source_pipeline) + + if source_pipeline not in source_config: + return HttpResponse(f"Source pipeline '{source_pipeline}' not found", status=404) + + source_data = source_config[source_pipeline] + + # Check if new pipeline name already exists + existing_pipelines = es.logstash.get_pipeline() + if new_pipeline in existing_pipelines: + return HttpResponse(f"Pipeline '{new_pipeline}' already exists. Please choose a different name.", + status=400) + + # Create the new pipeline with the same configuration as the source + es.logstash.put_pipeline( + id=new_pipeline, + body={ + "pipeline": source_data['pipeline'], + "last_modified": datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z', + "pipeline_metadata": { + "version": 1, + "type": "logstash_pipeline" + }, + "username": "LogstashUI", + "pipeline_settings": source_data.get('pipeline_settings', {}), + "description": source_data.get('description', f"Cloned from {source_pipeline}") + } + ) + + logger.info( + f"User '{request.user.username}' cloned pipeline '{source_pipeline}' to '{new_pipeline}' (Connection ID: {es_id})") + + # Close the modal and refresh the pipeline list + response = HttpResponse(""" + + """) + return response + + except Exception as e: + logger.error(f"Error cloning pipeline: {str(e)}") + return HttpResponse(f"Error cloning pipeline: {str(e)}", status=500) + + +def GetPipeline(request): + if request.method == "GET": + es_id = request.GET.get("es_id") + pipeline_name = request.GET.get("pipeline") + + # Validate required parameters + if not es_id or not pipeline_name: + return JsonResponse({"error": "Missing required parameters: es_id and pipeline"}, status=400) + + pipeline_config = get_logstash_pipeline(es_id, pipeline_name) + + # Handle case where pipeline couldn't be fetched + if not pipeline_config: + return JsonResponse({"error": f"Could not fetch pipeline '{pipeline_name}' from connection {es_id}"}, status=400) + + pipeline_string = pipeline_config['pipeline'] + + return JsonResponse({"code": pipeline_string}) diff --git a/LogstashUI/PipelineManager/static/js/agent_policies.js b/LogstashUI/PipelineManager/static/js/agent_policies.js new file mode 100644 index 00000000..e69de29b diff --git a/LogstashUI/PipelineManager/templates/components/pipeline_manager/agent_policies.html b/LogstashUI/PipelineManager/templates/components/pipeline_manager/agent_policies.html new file mode 100644 index 00000000..566549bd --- /dev/null +++ b/LogstashUI/PipelineManager/templates/components/pipeline_manager/agent_policies.html @@ -0,0 +1,10 @@ + + + + + Title + + + + + \ No newline at end of file diff --git a/LogstashUI/PipelineManager/templates/components/pipeline_manager/connection_modal.html b/LogstashUI/PipelineManager/templates/components/pipeline_manager/connection_modal.html index 4a6538e6..2223308d 100644 --- a/LogstashUI/PipelineManager/templates/components/pipeline_manager/connection_modal.html +++ b/LogstashUI/PipelineManager/templates/components/pipeline_manager/connection_modal.html @@ -73,8 +73,11 @@

Error

"> {% csrf_token %} - -
+ + + + +
@@ -85,9 +88,6 @@

Error

A descriptive name for this connection

- - -
@@ -167,13 +167,44 @@

Error

- +