Index: /branches/amp_4_0/platform/config/logstash_base.conf
===================================================================
--- /branches/amp_4_0/platform/config/logstash_base.conf	(nonexistent)
+++ /branches/amp_4_0/platform/config/logstash_base.conf	(working copy)
@@ -0,0 +1,17 @@
+# Basic pipeline for testing Logstash to OpenSearch
+input {
+  stdin { }
+}
+
+output {
+  stdout { }
+  opensearch {
+    hosts => ["https://127.0.0.1:9200"]
+    index => "logstash-%{+YYYY.MM.dd}"
+    user => "logstash"
+    password => "Array@123"
+    ssl => true
+    ssl_certificate_verification => true
+    cacert => "/etc/logstash/certs/root-ca.pem"
+  }
+}
Index: /branches/amp_4_0/platform/config/syslog.conf
===================================================================
--- /branches/amp_4_0/platform/config/syslog.conf	(revision 2697)
+++ /branches/amp_4_0/platform/config/syslog.conf	(working copy)
@@ -389,15 +389,17 @@
 }
 
 output {
-  elasticsearch {
-    hosts => ["http://localhost:9200"]
-    index => "acm-%{+YYYY.MM.dd}"
-    user => "logstash_internal"
-    password => "LArr@y2050"
-    template_name => "amplog"
+  stdout { }
+  opensearch {
+    hosts => ["https://127.0.0.1:9200"]
+    index => "logstash-%{+YYYY.MM.dd}"
+    user => "logstash"
+    password => "Array@123"
+    ssl => true
+    ssl_certificate_verification => true
+    cacert => "/etc/logstash/certs/root-ca.pem"
     manage_template => false
   }
-
   stdout {
     codec => rubydebug
   }
Index: /branches/amp_4_0/platform/tools/install_configure_opensearch.sh
===================================================================
--- /branches/amp_4_0/platform/tools/install_configure_opensearch.sh	(revision 2697)
+++ /branches/amp_4_0/platform/tools/install_configure_opensearch.sh	(working copy)
@@ -39,7 +39,7 @@
 # Subject Alternative Names (SANs) for node certificate
 SAN_HOSTS="DNS:localhost,IP:127.0.0.1,DNS:node-1"
 
-JAVA_HOME_PATH="/usr/lib/jvm/java-21-openjdk-21.0.7.0.6-1.el9.x86_64"
+JAVA_HOME_PATH="/usr/lib/jvm/java-21-openjdk-21.0.8.0.9-1.el9.x86_64/"
 
 # --- Functions ---
 
Index: /branches/amp_4_0/platform/tools/install_logstash_oss.sh
===================================================================
--- /branches/amp_4_0/platform/tools/install_logstash_oss.sh	(nonexistent)
+++ /branches/amp_4_0/platform/tools/install_logstash_oss.sh	(working copy)
@@ -0,0 +1,917 @@
+#!/bin/bash
+
+# Script to install Logstash OSS with OpenSearch output plugin on Rocky Linux,
+# configure a Logstash user for authentication, install PostgreSQL JDBC driver,
+# set up a syslog pipeline with jdbc_streaming filter to write output to OpenSearch acm-%{+YYYY.MM.dd} indices,
+# configure firewall rules for syslog traffic with port forwarding,
+# and create an index pattern for acm-* based on @timestamp.
+
+# --- Configuration ---
+LOGSTASH_VERSION="8.15.0"
+LOGSTASH_RPM_URL="https://artifacts.elastic.co/downloads/logstash/logstash-oss-${LOGSTASH_VERSION}-x86_64.rpm"
+LOGSTASH_SERVICE="logstash"
+LOGSTASH_CONFIG_DIR="/etc/logstash"
+LOGSTASH_CERT_DIR="${LOGSTASH_CONFIG_DIR}/certs"
+LOGSTASH_PIPELINE_CONF="${LOGSTASH_CONFIG_DIR}/conf.d/syslog.conf"
+OPENSEARCH_HOST="https://127.0.0.1:9200"
+NEW_ADMIN_PASSWORD="Arr@y2050"
+LOGSTASH_USER="logstash"
+LOGSTASH_PASSWORD="Array@123"
+FIREWALL_ZONE="public"
+JAVA_HEAP_SIZE="2g" # Heap size for plugin installations and runtime
+LOGSTASH_ROLE="logstash_custom" # Custom role to avoid static role conflicts
+
+# Certificate paths (aligned with previous OpenSearch script)
+CERT_DIR="/etc/opensearch/certs"
+CA_KEY="${CERT_DIR}/root-ca-key.pem"
+CA_CERT="${CERT_DIR}/root-ca.pem"
+LOGSTASH_KEY="${CERT_DIR}/logstash-key.pem"
+LOGSTASH_CERT="${CERT_DIR}/logstash.pem"
+LOGSTASH_PKCS8_KEY="${CERT_DIR}/logstash-key-pkcs8.pem"
+ADMIN_KEY="${CERT_DIR}/admin-key.pem"
+ADMIN_CERT="${CERT_DIR}/admin.pem"
+
+# Logstash Certificate DN (not used for authentication but kept for potential future mTLS support)
+LOGSTASH_DN="/C=IN/ST=Karnataka/L=Bengaluru/O=OpenSearchLogstash/CN=logstash"
+LOGSTASH_DN_MAPPING="CN=logstash,O=OpenSearchLogstash,L=Bengaluru,ST=Karnataka,C=IN"
+ADMIN_DN_MAPPING="C=IN,ST=Karnataka,L=Bengaluru,O=OpenSearchAdmin,CN=admin"
+
+# Java path (aligned with previous script)
+JAVA_HOME_PATH="/usr/lib/jvm/java-21-openjdk-21.0.8.0.9-1.el9.x86_64/"
+
+# JDBC driver configuration
+JDBC_DRIVER_DIR="/etc/logstash/jdbc_drivers"
+JDBC_DRIVER_FILE="${JDBC_DRIVER_DIR}/postgresql-42.7.5.jar"
+JDBC_DRIVER_URL="https://jdbc.postgresql.org/download/postgresql-42.7.5.jar"
+
+# --- Functions ---
+
+log_info() {
+    echo -e "\n\033[0;34m[INFO]\033[0m $1"
+}
+
+log_success() {
+    echo -e "\n\033[0;32m[SUCCESS]\033[0m $1"
+}
+
+log_warn() {
+    echo -e "\n\033[0;33m[WARNING]\033[0m $1"
+}
+
+log_error() {
+    echo -e "\n\033[0;31m[ERROR]\033[0m $1"
+    exit 1
+}
+
+# Check if certificate files exist
+check_cert_files() {
+    log_info "Checking CA certificate file..."
+    for file in "${CA_CERT}" "${ADMIN_KEY}" "${ADMIN_CERT}"; do
+        if [ ! -f "${file}" ]; then
+            log_error "Certificate file ${file} does not exist. Ensure OpenSearch is installed and certificates are generated."
+        fi
+    done
+    log_success "Required certificate files exist."
+}
+
+# Check for Java installation
+check_java() {
+    log_info "Checking for Java installation (OpenJDK 17 or higher required)..."
+    if command -v java &>/dev/null; then
+        JAVA_VERSION=$(java -version 2>&1 | awk -F '"' '/version/ {print $2}' | cut -d'.' -f1)
+        if (( JAVA_VERSION >= 17 )); then
+            log_success "Java ${JAVA_VERSION} found. Compatible with Logstash 8.x."
+            if [ -z "${JAVA_HOME_PATH}" ]; then
+                DETECTED_JAVA_BIN=$(readlink -f $(which java))
+                if [ -n "${DETECTED_JAVA_BIN}" ]; then
+                    JAVA_HOME_PATH=$(dirname $(dirname "${DETECTED_JAVA_BIN}"))
+                    log_info "Auto-detected JAVA_HOME_PATH: ${JAVA_HOME_PATH}"
+                else
+                    log_error "Failed to determine JAVA_HOME_PATH. Please set it manually."
+                fi
+            fi
+        else
+            log_error "Java version ${JAVA_VERSION} found. Logstash 8.x requires Java 17 or higher."
+        fi
+    else
+        log_error "Java not found. Please install Java (OpenJDK 17 or higher)."
+    fi
+}
+
+# Check and create security_admin role if missing
+check_security_admin_role() {
+    log_info "Checking for security_admin role..."
+    ROLE_RESPONSE=$(curl -s -k -X GET "${OPENSEARCH_HOST}/_plugins/_security/api/roles/security_admin" \
+        -u admin:${NEW_ADMIN_PASSWORD})
+    if echo "$ROLE_RESPONSE" | grep -q "\"security_admin\""; then
+        log_success "security_admin role exists."
+    else
+        log_info "security_admin role not found. Creating role..."
+        CREATE_ROLE_RESPONSE=$(curl -s -k -X PUT "${OPENSEARCH_HOST}/_plugins/_security/api/roles/security_admin" \
+            -u admin:${NEW_ADMIN_PASSWORD} \
+            -H "Content-Type: application/json" \
+            -d '{
+              "cluster_permissions": [
+                "cluster:admin/opendistro_security/*"
+              ]
+            }')
+        if echo "$CREATE_ROLE_RESPONSE" | grep -q "\"status\":\"CREATED\"\|\"status\":\"OK\""; then
+            log_success "security_admin role created."
+        else
+            log_error "Failed to create security_admin role. Response: $CREATE_ROLE_RESPONSE"
+        fi
+    fi
+}
+
+# Check admin certificate permissions
+check_admin_permissions() {
+    log_info "Checking admin certificate permissions..."
+    ADMIN_MAPPING=$(curl -s -k -X GET "${OPENSEARCH_HOST}/_plugins/_security/api/rolesmapping/security_admin" \
+        -u admin:${NEW_ADMIN_PASSWORD})
+    if echo "$ADMIN_MAPPING" | grep -q "${ADMIN_DN_MAPPING}"; then
+        log_success "Admin certificate has security_admin role."
+    else
+        log_info "Mapping admin DN to security_admin role..."
+        MAPPING_RESPONSE=$(curl -s -k -X PUT "${OPENSEARCH_HOST}/_plugins/_security/api/rolesmapping/security_admin" \
+            -u admin:${NEW_ADMIN_PASSWORD} \
+            -H "Content-Type: application/json" \
+            -d "{\"backend_roles\": [], \"hosts\": [], \"users\": [\"${ADMIN_DN_MAPPING}\"]}")
+        if echo "$MAPPING_RESPONSE" | grep -q "\"status\":\"CREATED\"\|\"status\":\"OK\""; then
+            log_success "Admin DN mapped to security_admin role."
+        else
+            log_error "Failed to map admin DN to security_admin role. Response: $MAPPING_RESPONSE"
+        fi
+    fi
+}
+
+# Create Logstash user in OpenSearch
+create_logstash_user() {
+    log_info "Creating Logstash user in OpenSearch..."
+    USER_RESPONSE=$(curl -s -k -X PUT "${OPENSEARCH_HOST}/_plugins/_security/api/internalusers/${LOGSTASH_USER}" \
+        -u admin:${NEW_ADMIN_PASSWORD} \
+        -H "Content-Type: application/json" \
+        -d "{\"password\": \"${LOGSTASH_PASSWORD}\"}")
+    if echo "$USER_RESPONSE" | grep -q "\"status\":\"CREATED\"\|\"status\":\"OK\""; then
+        log_success "Logstash user created."
+    else
+        log_error "Failed to create Logstash user. Response: $USER_RESPONSE"
+    fi
+}
+
+# Map Logstash user to logstash_custom role
+map_logstash_user() {
+    log_info "Mapping Logstash user to '${LOGSTASH_ROLE}' role..."
+    MAPPING_RESPONSE=$(curl -s -k -X PUT "${OPENSEARCH_HOST}/_plugins/_security/api/rolesmapping/${LOGSTASH_ROLE}" \
+        -u admin:${NEW_ADMIN_PASSWORD} \
+        -H "Content-Type: application/json" \
+        -d "{\"backend_roles\": [], \"hosts\": [], \"users\": [\"${LOGSTASH_USER}\"]}")
+    if echo "$MAPPING_RESPONSE" | grep -q "\"status\":\"CREATED\"\|\"status\":\"OK\""; then
+        log_success "Logstash user mapped to ${LOGSTASH_ROLE} role."
+    else
+        log_error "Failed to map Logstash user to ${LOGSTASH_ROLE}. Response: $MAPPING_RESPONSE"
+    fi
+
+    log_info "Verifying Logstash role mapping..."
+    MAPPING_CHECK=$(curl -s -k -X GET "${OPENSEARCH_HOST}/_plugins/_security/api/rolesmapping/${LOGSTASH_ROLE}" \
+        -u admin:${NEW_ADMIN_PASSWORD})
+    if echo "$MAPPING_CHECK" | grep -q "${LOGSTASH_USER}"; then
+        log_success "Logstash user mapping to ${LOGSTASH_ROLE} verified."
+    else
+        log_error "Logstash user mapping not found. Response: $MAPPING_CHECK"
+    fi
+}
+
+# Ensure logstash_custom role has sufficient permissions
+configure_logstash_role() {
+    log_info "Configuring ${LOGSTASH_ROLE} role in OpenSearch for acm-* indices..."
+    ROLE_RESPONSE=$(curl -s -k -X PUT "${OPENSEARCH_HOST}/_plugins/_security/api/roles/${LOGSTASH_ROLE}" \
+        -u admin:${NEW_ADMIN_PASSWORD} \
+        -H "Content-Type: application/json" \
+        -d '{
+          "cluster_permissions": ["cluster_composite_ops", "cluster_monitor"],
+          "index_permissions": [{
+            "index_patterns": ["acm-*"],
+            "allowed_actions": ["write", "create_index", "manage"]
+          }]
+        }')
+    if echo "$ROLE_RESPONSE" | grep -q "\"status\":\"CREATED\"\|\"status\":\"OK\""; then
+        log_success "${LOGSTASH_ROLE} role configured with acm-* index permissions."
+    else
+        log_error "Failed to configure ${LOGSTASH_ROLE} role. Response: $ROLE_RESPONSE"
+    fi
+}
+
+# Create index pattern for acm-* based on @timestamp
+create_index_pattern() {
+    OPENSEARCH_DASHBOARDS_HOST="https://127.0.0.1:5601"
+    log_info "Checking if OpenSearch Dashboards is running on ${OPENSEARCH_DASHBOARDS_HOST}..."
+    if curl -s -k -X GET "${OPENSEARCH_DASHBOARDS_HOST}/api/status" -u admin:${NEW_ADMIN_PASSWORD} | grep -q "overall"; then
+        log_success "OpenSearch Dashboards is running."
+    else
+        log_warn "OpenSearch Dashboards is not running, inaccessible, or authentication failed on ${OPENSEARCH_DASHBOARDS_HOST}. Skipping index pattern creation. Please ensure OpenSearch Dashboards is installed, running, and accessible with the admin user (username: admin, password: ${NEW_ADMIN_PASSWORD}), then create the index pattern manually in the Dashboards UI (Stack Management -> Index Patterns -> Create Index Pattern) with pattern 'acm-*' and time field '@timestamp'."
+        return
+    fi
+
+    log_info "Verifying admin user authentication for OpenSearch Dashboards..."
+    AUTH_RESPONSE=$(curl -s -k -X GET "${OPENSEARCH_DASHBOARDS_HOST}/api/status" -u admin:${NEW_ADMIN_PASSWORD})
+    if echo "$AUTH_RESPONSE" | grep -q "overall"; then
+        log_success "Admin user authentication verified."
+    else
+        log_error "Admin user authentication failed. Response: $AUTH_RESPONSE. Please verify the admin user credentials (username: admin, password: ${NEW_ADMIN_PASSWORD}) in OpenSearch Dashboards and ensure the user has the 'kibana_admin' role."
+    fi
+
+    log_info "Ensuring admin user has kibana_admin role for saved objects..."
+    ROLE_MAPPING_RESPONSE=$(curl -s -k -X GET "${OPENSEARCH_HOST}/_plugins/_security/api/rolesmapping/kibana_admin" \
+        -u admin:${NEW_ADMIN_PASSWORD})
+    if echo "$ROLE_MAPPING_RESPONSE" | grep -q "${ADMIN_DN_MAPPING}\|admin"; then
+        log_success "Admin user or DN already mapped to kibana_admin role."
+    else
+        log_info "Mapping admin user to kibana_admin role..."
+        MAPPING_RESPONSE=$(curl -s -k -X PUT "${OPENSEARCH_HOST}/_plugins/_security/api/rolesmapping/kibana_admin" \
+            -u admin:${NEW_ADMIN_PASSWORD} \
+            -H "Content-Type: application/json" \
+            -d "{\"backend_roles\": [], \"hosts\": [], \"users\": [\"admin\", \"${ADMIN_DN_MAPPING}\"]}")
+        if echo "$MAPPING_RESPONSE" | grep -q "\"status\":\"CREATED\"\|\"status\":\"OK\""; then
+            log_success "Admin user mapped to kibana_admin role."
+        else
+            log_error "Failed to map admin user to kibana_admin role. Response: $MAPPING_RESPONSE. Please map the role manually and retry."
+        fi
+    fi
+
+    log_info "Creating index pattern for acm-* with @timestamp as time field (ignoring SSL verification)..."
+    INDEX_PATTERN_RESPONSE=$(curl -s -k -X POST "${OPENSEARCH_DASHBOARDS_HOST}/api/saved_objects/index-pattern/acm-index-pattern" \
+        -u admin:${NEW_ADMIN_PASSWORD} \
+        -H "Content-Type: application/json" \
+        -H "osd-xsrf: true" \
+        -d '{
+          "attributes": {
+            "title": "acm-*",
+            "timeFieldName": "@timestamp"
+          }
+        }')
+    if echo "$INDEX_PATTERN_RESPONSE" | grep -q "\"type\":\"index-pattern\""; then
+        log_success "Index pattern 'acm-*' created with @timestamp as time field."
+    else
+        log_error "Failed to create index pattern. Response: $INDEX_PATTERN_RESPONSE. Please create the index pattern manually in OpenSearch Dashboards (Stack Management -> Index Patterns -> Create Index Pattern) with pattern 'acm-*' and time field '@timestamp'."
+    fi
+}
+
+# Configure firewall rules for syslog traffic
+configure_firewall() {
+    log_info "Configuring firewall for Logstash syslog input in zone ${FIREWALL_ZONE}..."
+    if systemctl is-active --quiet firewalld; then
+        # Verify firewall zone exists
+        if ! firewall-cmd --get-zones | grep -qw "${FIREWALL_ZONE}"; then
+            log_error "Firewall zone ${FIREWALL_ZONE} does not exist. Available zones: $(firewall-cmd --get-zones)"
+        fi
+
+        # Check if zone is active
+        ACTIVE_ZONES=$(firewall-cmd --get-active-zones | grep -B1 "${FIREWALL_ZONE}" | grep -v "${FIREWALL_ZONE}")
+        if [ -z "${ACTIVE_ZONES}" ]; then
+            log_warn "Firewall zone ${FIREWALL_ZONE} is not active. Ensure it is assigned to an interface."
+        fi
+
+        # Enable IP masquerading
+        if ! firewall-cmd --query-masquerade --zone="${FIREWALL_ZONE}" &>/dev/null; then
+            sudo firewall-cmd --permanent --zone="${FIREWALL_ZONE}" --add-masquerade || {
+                log_warn "Failed to enable IP masquerading in zone ${FIREWALL_ZONE}."
+            }
+            log_success "IP masquerading enabled in zone ${FIREWALL_ZONE}."
+        else
+            log_info "IP masquerading already enabled in zone ${FIREWALL_ZONE}."
+        fi
+
+        # Add port 5514 for syslog traffic (pipeline listens on 5514)
+        for PROTO in udp tcp; do
+            if ! firewall-cmd --query-port=5514/${PROTO} --zone="${FIREWALL_ZONE}" &>/dev/null; then
+                sudo firewall-cmd --permanent --zone="${FIREWALL_ZONE}" --add-port=5514/${PROTO} || {
+                    log_warn "Failed to open port 5514/${PROTO} in zone ${FIREWALL_ZONE}."
+                }
+                log_success "Opened port 5514/${PROTO} in zone ${FIREWALL_ZONE}."
+            else
+                log_info "Port 5514/${PROTO} already open in zone ${FIREWALL_ZONE}."
+            fi
+        done
+
+        # Add port forwarding from 514 to 5514
+        for PROTO in udp tcp; do
+            if ! firewall-cmd --query-forward-port=port=514:proto=${PROTO}:toport=5514 --zone="${FIREWALL_ZONE}" &>/dev/null; then
+                sudo firewall-cmd --permanent --zone="${FIREWALL_ZONE}" --add-forward-port=port=514:proto=${PROTO}:toport=5514 || {
+                    log_warn "Failed to add port forwarding from 514/${PROTO} to 5514 in zone ${FIREWALL_ZONE}."
+                }
+                log_success "Added port forwarding from 514/${PROTO} to 5514 in zone ${FIREWALL_ZONE}."
+            else
+                log_info "Port forwarding from 514/${PROTO} to 5514 already exists in zone ${FIREWALL_ZONE}."
+            fi
+        done
+
+        # Reload firewall
+        sudo firewall-cmd --reload || log_error "Failed to reload firewall."
+        log_success "Firewall reloaded successfully."
+    else
+        log_warn "Firewalld not running. Configure firewall manually to allow UDP/TCP port 5514 and forwarding from 514 to 5514."
+    fi
+}
+
+# --- Main Script ---
+
+log_info "Starting Logstash 8.15.0 installation and configuration for OpenSearch 3.x..."
+
+# 1. Check for root privileges
+if [[ $EUID -ne 0 ]]; then
+    log_error "This script must be run as root. Please use 'sudo bash $0'."
+fi
+
+# 2. Check for Java
+check_java
+
+# 3. Install Logstash OSS
+log_info "Installing Logstash OSS ${LOGSTASH_VERSION}..."
+sudo dnf install -y "${LOGSTASH_RPM_URL}" || log_error "Failed to install Logstash."
+log_success "Logstash installed."
+
+# 4. Install OpenSearch output, JDBC integration, and GeoIP plugins
+log_info "Installing logstash-output-opensearch plugin..."
+if sudo /usr/share/logstash/bin/logstash-plugin list | grep -q "logstash-output-opensearch"; then
+    log_success "logstash-output-opensearch plugin already installed."
+else
+    log_info "Attempting to install logstash-output-opensearch with ${JAVA_HEAP_SIZE} heap..."
+    export LS_JAVA_OPTS="-Xmx${JAVA_HEAP_SIZE} -Xms${JAVA_HEAP_SIZE}"
+    if sudo -E /usr/share/logstash/bin/logstash-plugin install logstash-output-opensearch; then
+        log_success "OpenSearch output plugin installed."
+    else
+        log_info "Retrying with 4g heap..."
+        export LS_JAVA_OPTS="-Xmx4g -Xms4g"
+        if sudo -E /usr/share/logstash/bin/logstash-plugin install logstash-output-opensearch; then
+            log_success "OpenSearch output plugin installed with 4g heap."
+        else
+            log_error "Failed to install logstash-output-opensearch plugin even with 4g heap."
+        fi
+    fi
+    unset LS_JAVA_OPTS
+fi
+
+log_info "Checking for logstash-integration-jdbc plugin..."
+if sudo /usr/share/logstash/bin/logstash-plugin list | grep -q "logstash-integration-jdbc"; then
+    log_success "logstash-integration-jdbc plugin already installed, providing jdbc_streaming filter."
+else
+    log_info "Attempting to install logstash-integration-jdbc with ${JAVA_HEAP_SIZE} heap..."
+    export LS_JAVA_OPTS="-Xmx${JAVA_HEAP_SIZE} -Xms${JAVA_HEAP_SIZE}"
+    if sudo -E /usr/share/logstash/bin/logstash-plugin install logstash-integration-jdbc; then
+        log_success "logstash-integration-jdbc plugin installed."
+    else
+        log_info "Retrying with 4g heap..."
+        export LS_JAVA_OPTS="-Xmx4g -Xms4g"
+        if sudo -E /usr/share/logstash/bin/logstash-plugin install logstash-integration-jdbc; then
+            log_success "logstash-integration-jdbc plugin installed with 4g heap."
+        else
+            log_error "Failed to install logstash-integration-jdbc plugin even with 4g heap."
+        fi
+    fi
+    unset LS_JAVA_OPTS
+fi
+
+log_info "Checking for logstash-filter-geoip plugin..."
+if sudo /usr/share/logstash/bin/logstash-plugin list | grep -q "logstash-filter-geoip"; then
+    log_success "logstash-filter-geoip plugin already installed."
+else
+    log_info "Attempting to install logstash-filter-geoip with ${JAVA_HEAP_SIZE} heap..."
+    export LS_JAVA_OPTS="-Xmx${JAVA_HEAP_SIZE} -Xms${JAVA_HEAP_SIZE}"
+    if sudo -E /usr/share/logstash/bin/logstash-plugin install logstash-filter-geoip; then
+        log_success "GeoIP plugin installed."
+    else
+        log_info "Retrying with 4g heap..."
+        export LS_JAVA_OPTS="-Xmx4g -Xms4g"
+        if sudo -E /usr/share/logstash/bin/logstash-plugin install logstash-filter-geoip; then
+            log_success "GeoIP plugin installed with 4g heap."
+        else
+            log_warn "Failed to install GeoIP plugin. Pipeline will function without GeoIP enrichment. To retry, run: sudo -E LS_JAVA_OPTS=\"-Xmx4g -Xms4g\" /usr/share/logstash/bin/logstash-plugin install logstash-filter-geoip"
+        fi
+    fi
+    unset LS_JAVA_OPTS
+fi
+
+# 5. Create Logstash certificate directory
+log_info "Creating Logstash certificate directory: ${LOGSTASH_CERT_DIR}..."
+sudo mkdir -p "${LOGSTASH_CERT_DIR}" || log_error "Failed to create directory ${LOGSTASH_CERT_DIR}."
+
+# 6. Copy CA certificate to Logstash directory and set permissions
+log_info "Copying CA certificate to Logstash and set permissions..."
+sudo cp "${CA_CERT}" "${LOGSTASH_CERT_DIR}/" || log_error "Failed to copy CA certificate."
+sudo chown -R logstash:logstash "${LOGSTASH_CERT_DIR}" || log_error "Failed to set ownership for Logstash certificates."
+sudo chmod 755 "${LOGSTASH_CERT_DIR}"
+sudo chmod 644 "${LOGSTASH_CERT_DIR}/root-ca.pem"
+log_success "CA certificate copied and permissions set."
+
+# 7. Install PostgreSQL JDBC driver
+log_info "Installing PostgreSQL JDBC driver..."
+sudo mkdir -p "${JDBC_DRIVER_DIR}" || log_error "Failed to create JDBC driver directory ${JDBC_DRIVER_DIR}."
+sudo curl -o "${JDBC_DRIVER_FILE}" "${JDBC_DRIVER_URL}" || log_error "Failed to download PostgreSQL JDBC driver."
+sudo chown logstash:logstash "${JDBC_DRIVER_FILE}" || log_error "Failed to set ownership for JDBC driver."
+sudo chmod 644 "${JDBC_DRIVER_FILE}" || log_error "Failed to set permissions for JDBC driver."
+sudo chown -R logstash:logstash "${JDBC_DRIVER_DIR}" || log_error "Failed to set ownership for JDBC driver directory."
+sudo chmod -R u+rwX "${JDBC_DRIVER_DIR}" || log_error "Failed to set permissions for JDBC driver directory."
+log_success "PostgreSQL JDBC driver installed."
+
+# 8. Verify existing CA and admin certificates
+check_cert_files
+
+# 9. Check and create security_admin role if missing
+check_security_admin_role
+
+# 10. Verify admin permissions
+check_admin_permissions
+
+# 11. Configure OpenSearch Security for Logstash
+log_info "Configuring OpenSearch security for Logstash authentication..."
+
+# Configure logstash_custom role with necessary permissions
+configure_logstash_role
+
+# Create Logstash user and map to role
+create_logstash_user
+map_logstash_user
+
+# 12. Create index pattern for acm-*
+create_index_pattern
+
+# 13. Configure Logstash Pipeline
+log_info "Configuring syslog pipeline for OpenSearch output to acm-%{+YYYY.MM.dd} indices..."
+
+# Backup existing pipeline config if it exists
+if [ -f "${LOGSTASH_PIPELINE_CONF}" ]; then
+    sudo cp "${LOGSTASH_PIPELINE_CONF}" "${LOGSTASH_PIPELINE_CONF}.bak_$(date +%Y%m%d%H%M%S)" || log_warn "Failed to backup existing pipeline config."
+fi
+
+cat << 'EOF' | sudo tee "${LOGSTASH_PIPELINE_CONF}" > /dev/null
+input {
+  udp {
+    port => 5514
+    type => "syslog"
+    codec => plain {
+      charset => "UTF-8"
+    }
+  }
+  tcp {
+    port => 5514
+    type => "syslog"
+    codec => plain {
+      charset => "UTF-8"
+    }
+  }
+}
+
+filter {
+  # Stage 1: Initial Cleaning
+  if [message] {
+    ruby {
+      code => '
+        if event.get("message").is_a?(String)
+          event.set("message", event.get("message").gsub(/\\x00/, ""))
+        elsif event.get("message").is_a?(Array)
+          cleaned = event.get("message").map { |m| m.gsub(/\\x00/, "") if m }
+          event.set("message", cleaned.join(" "))
+        end
+      '
+    }
+  }
+
+  if [type] == "syslog" {
+    # Stage 2: Parsing Attempts (Cascading Grok)
+
+    # Attempt 1: RFC 5424
+    grok {
+      match => {
+        "message" => "^<%{NUMBER:syslog_pri}>%{NUMBER:syslog_protocol_version} %{TIMESTAMP_ISO8601:syslog_timestamp} %{HOSTNAME:syslog_hostname} %{DATA:syslog_appname} %{DATA:syslog_procid} %{DATA:syslog_msgid} (?<syslog_structured_data>-|\\[.*?\\])(?:\\s+)?%{GREEDYDATA:syslog_message}"
+      }
+      tag_on_failure => ["_grokparsefailure_rfc5424"]
+      add_tag => ["rfc5424_attempt"]
+    }
+
+    # Parse syslog_message as AN_WELF_LOG
+    if !("_grokparsefailure_rfc5424" in [tags]) and [syslog_message] {
+      mutate {
+        strip => ["syslog_message"]
+      }
+      grok {
+        match => {
+          "syslog_message" => "^AN_WELF_LOG:id=%{WORD:log_id} time=\"%{TIMESTAMP_ISO8601:log_time}\" fw=%{IP:virtual_ip} pri=%{POSINT:priority} proto=%{WORD:protocol} src=%{IP:src_ip} dstname=%{IP:destination_name} arg=%{URIPATH:arg} op=%{WORD:operation} agent=\"%{DATA:user_agent}\" result=%{INT:http_result_code} sent=%{INT:bytes_sent} duration=%{NUMBER:duration_seconds} msg=\"%{GREEDYDATA:message_detail_raw}\""
+        }
+        tag_on_failure => ["_grokparsefailure_an_welf_log_rfc5424"]
+        add_tag => ["an_welf_log_rfc5424_subparsed"]
+      }
+      if !("_grokparsefailure_an_welf_log_rfc5424" in [tags]) and [message_detail_raw] {
+        mutate {
+          gsub => ["message_detail_raw", "\\s+", " "]
+        }
+        grok {
+          match => {
+            "message_detail_raw" => "^cache:%{WORD:cache_status} peer:%{WORD:peer_type}/%{IP:peer_ip}$"
+          }
+          tag_on_failure => ["_grokparsefailure_msg"]
+          add_tag => ["msg_subparsed"]
+        }
+      }
+    }
+
+    # Attempt 2: AN_WELF_LOG (WITH SYSLOG HEADER)
+    if "_grokparsefailure_rfc5424" in [tags] {
+      grok {
+        match => {
+          "message" => "^<%{POSINT:syslog_pri}>%{POSINT:syslog_version} %{TIMESTAMP_ISO8601:syslog_timestamp} %{HOSTNAME:syslog_hostname} %{DATA:syslog_appname} %{DATA:syslog_procid} %{INT:event_id} - AN_WELF_LOG:id=%{WORD:log_id} time=\"%{YEAR:year}-%{MONTHNUM:month}-%{MONTHDAY:day} %{HOUR:hour}:%{MINUTE:minute}:%{SECOND:second}\" fw=%{IP:virtual_ip} pri=%{POSINT:priority} proto=%{WORD:protocol} src=%{IP:src_ip} dstname=%{IP:destination_name} arg=%{URIPATH:arg} op=%{WORD:operation} agent=\"%{DATA:user_agent}\" result=%{INT:http_result_code} sent=%{INT:bytes_sent} duration=%{NUMBER:duration_seconds} msg=\"%{GREEDYDATA:destination_data_raw}\""
+        }
+        tag_on_failure => ["_grokparsefailure_an_welf_log"]
+        add_tag => ["an_welf_log_attempt"]
+      }
+      if !("_grokparsefailure_an_welf_log" in [tags]) {
+        mutate {
+          add_field => { "log_time" => "%{year}-%{month}-%{day} %{hour}:%{minute}:%{second}" }
+          remove_field => ["year", "month", "day", "hour", "minute", "second"]
+        }
+        mutate {
+          gsub => ["destination_data_raw", "\\s+", " "]
+        }
+      }
+    }
+
+    # Attempt 3: AN_WELF_LOG (without SYSLOG HEADER)
+    if "_grokparsefailure_rfc5424" in [tags] and "_grokparsefailure_an_welf_log" in [tags] {
+      grok {
+        match => {
+          "message" => "^AN_WELF_LOG:id=%{WORD:log_id} time=\"%{YEAR:year}-%{MONTHNUM:month}-%{MONTHDAY:day} %{HOUR:hour}:%{MINUTE:minute}:%{SECOND:second}\" fw=%{IP:virtual_ip} pri=%{POSINT:priority} proto=%{WORD:protocol} src=%{IP:src_ip} dstname=%{IP:destination_name} arg=%{URIPATH:arg} op=%{WORD:operation} agent=\"%{DATA:user_agent}\" result=%{INT:http_result_code} sent=%{INT:bytes_sent} duration=%{NUMBER:duration_seconds} msg=\"%{GREEDYDATA:message_detail_raw}\""
+        }
+        tag_on_failure => ["_grokparsefailure_an_welf_log_no_header"]
+        add_tag => ["an_welf_log_no_header_attempt"]
+      }
+      if !("_grokparsefailure_an_welf_log_no_header" in [tags]) {
+        mutate {
+          add_field => { "log_time" => "%{year}-%{month}-%{day} %{hour}:%{minute}:%{second}" }
+          remove_field => ["year", "month", "day", "hour", "minute", "second"]
+        }
+        mutate {
+          gsub => ["message_detail_raw", "\\s+", " "]
+        }
+        mutate {
+          add_field => { "syslog_priority" => "%{priority}" }
+          add_field => { "syslog_timestamp" => "%{log_time}" }
+        }
+      }
+    }
+
+    # Attempt 4: Custom/Non-Standard
+    if "_grokparsefailure_rfc5424" in [tags] and "_grokparsefailure_an_welf_log" in [tags] and "_grokparsefailure_an_welf_log_no_header" in [tags] {
+      grok {
+        match => {
+          "message" => "^<%{POSINT:syslog_pri}>%{MONTH:syslog_month} +%{MONTHDAY:syslog_day} %{YEAR:syslog_year} %{TIME:syslog_time} %{IPORHOST:syslog_hostname} %{GREEDYDATA:syslog_content_kv}"
+        }
+        tag_on_failure => ["_grokparsefailure_custom_nonstandard"]
+        add_tag => ["custom_nonstandard_attempt"]
+      }
+      if !("_grokparsefailure_custom_nonstandard" in [tags]) {
+        date {
+          match => ["syslog_month syslog_day syslog_year syslog_time", "MMM ddYYYY HH:mm:ss"]
+          target => "syslog_timestamp"
+          add_tag => ["_dateparseok"]
+          tag_on_failure => ["_dateparsefailure"]
+        }
+        kv {
+          source => "syslog_content_kv"
+          field_split => " "
+          value_split => "="
+          target => "syslog_kv_data"
+        }
+        mutate {
+          add_field => { "syslog_appname" => "%{[syslog_kv_data][id]}" }
+          add_field => { "syslog_msgid" => "%{[syslog_kv_data][type]}" }
+          add_field => { "syslog_message" => "%{[syslog_kv_data][msg]}" }
+          remove_field => ["syslog_month", "syslog_day", "syslog_year", "syslog_time", "syslog_content_kv"]
+        }
+
+        # Map fields from syslog_kv_data to common field names
+        if [syslog_kv_data][fw] {
+          mutate { add_field => { "virtual_ip" => "%{[syslog_kv_data][fw]}" } }
+        }
+        if [syslog_kv_data][src] {
+          mutate { add_field => { "client_ip" => "%{[syslog_kv_data][src]}" } }
+        }
+        if [syslog_kv_data][dst] {
+          mutate { add_field => { "destination_ip" => "%{[syslog_kv_data][dst]}" } }
+        }
+        if [syslog_kv_data][dport] {
+          mutate {
+            add_field => { "destination_port" => "%{[syslog_kv_data][dport]}" }
+            convert => { "destination_port" => "integer" }
+          }
+        }
+        # Optional: Parse log_time and timezone from KV data if needed for @timestamp
+        if [syslog_kv_data][time] {
+          date {
+            match => ["syslog_kv_data.time", "YYYY-M-d HH:mm:ss"]
+            target => "log_message_timestamp"
+            tag_on_failure => ["_kvtimeparsefailure"]
+          }
+        }
+      }
+    }
+
+    # Attempt 5: Traditional BSD Syslog
+    if "_grokparsefailure_rfc5424" in [tags] and "_grokparsefailure_an_welf_log" in [tags] and "_grokparsefailure_an_welf_log_no_header" in [tags] and "_grokparsefailure_custom_nonstandard" in [tags] {
+      grok {
+        match => {
+          "message" => "^<%{POSINT:syslog_pri}>%{SYSLOGTIMESTAMP:syslog_timestamp} %{IPORHOST:syslog_hostname} %{DATA:syslog_appname}(?:\\[%{POSINT:syslog_procid}\\])?:(?: %{DATA:syslog_msgid})? %{GREEDYDATA:syslog_message}"
+        }
+        tag_on_failure => ["_grokparsefailure_bsd"]
+        add_tag => ["bsd_attempt"]
+      }
+    }
+
+    # Stage 3: Common Post-Parsing Processing
+    if !("_grokparsefailure_rfc5424" in [tags] and "_grokparsefailure_an_welf_log" in [tags] and "_grokparsefailure_an_welf_log_no_header" in [tags] and "_grokparsefailure_custom_nonstandard" in [tags] and "_grokparsefailure_bsd" in [tags]) {
+      # USERAGENT FILTER
+      if [user_agent] {
+        useragent {
+          source => "user_agent"
+          target => "useragent"
+          remove_field => ["user_agent"]
+        }
+      }
+
+      if [syslog_appname] == "-" { mutate { remove_field => ["syslog_appname"] } }
+      if [syslog_procid] == "-"  { mutate { remove_field => ["syslog_procid"]  } }
+      if [syslog_structured_data] == "-" {
+        mutate { remove_field => ["syslog_structured_data"] }
+      } else if [syslog_structured_data] =~ /^\\[.+\\]$/ {
+        # Handle structured data parsing if needed
+      }
+
+      mutate {
+        rename => {
+          "syslog_pri" => "syslog_priority"
+          "syslog_hostname" => "device_hostname"
+          "syslog_appname" => "application_name"
+          "syslog_procid" => "process_id"
+          "syslog_msgid" => "message_id"
+        }
+        rename => { "syslog_protocol_version" => "syslog_version" }
+      }
+
+      ruby {
+        code => "
+          if event.get('syslog_priority')
+            level = event.get('syslog_priority').to_i % 8
+            level_map = {
+              0 => 'Emergency', 1 => 'Alert', 2 => 'Critical', 3 => 'Error',
+              4 => 'Warning', 5 => 'Notice', 6 => 'Informational', 7 => 'Debug'
+            }
+            event.set('severity_numeric', level)
+            event.set('severity', level_map[level] || 'Unknown')
+          end
+        "
+      }
+
+      ruby {
+        code => "
+          if event.get('syslog_priority')
+            facility = event.get('syslog_priority').to_i / 8
+            facility_map = {
+              0 => 'Kernel', 1 => 'User', 2 => 'Mail', 3 => 'System Daemons',
+              4 => 'Security/Authorization', 5 => 'Syslog', 6 => 'LPR Subsystem',
+              7 => 'NNTP Subsystem', 8 => 'UUCP Subsystem', 9 => 'Clock Daemon',
+              10 => 'Security/Authorization', 11 => 'FTP Daemon', 12 => 'NTP Subsystem',
+              13 => 'Log Audit', 14 => 'Log Alert', 15 => 'Clock Daemon',
+              16 => 'Local0', 17 => 'Local1', 18 => 'Local2', 19 => 'Local3',
+              20 => 'Local4', 21 => 'Local5', 22 => 'Local6', 23 => 'Local7'
+            }
+            event.set('log_facility_numeric', facility)
+            event.set('log_facility', facility_map[facility] || 'Unknown')
+          end
+        "
+      }
+
+      # IP RENAMING LOGIC
+      mutate {
+        copy => { "[host][ip]" => "device_ip" }
+      }
+
+      if [src_ip] {
+        mutate {
+          rename => { "src_ip" => "client_ip" }
+        }
+      }
+
+      if [client_ip] {
+        geoip {
+          source => "client_ip"
+          target => "client_geoip"
+          tag_on_failure => ["_geoip_client_failed"]
+        }
+      }
+      if [client_ip] and ![client_geoip][location] {
+        mutate {
+          add_field => {
+            "[client_geoip][city_name]" => "Unknown"
+            "[client_geoip][country_name]" => "Unresolved"
+            "[client_geoip][location][lat]" => "12.9254143647407"
+            "[client_geoip][location][lon]" => "77.63149239905859"
+          }
+        }
+        mutate {
+          add_tag => ["_geoip_client_unresolved"]
+        }
+      }
+
+      if [destination_ip] {
+        geoip {
+          source => "destination_ip"
+          target => "destination_geoip"
+          tag_on_failure => ["_geoip_destination_failed"]
+        }
+      }
+      if [destination_ip] and ![destination_geoip][location] {
+        mutate {
+          add_field => {
+            "[destination_geoip][city_name]" => "Unknown"
+            "[destination_geoip][country_name]" => "Unresolved"
+            "[destination_geoip][location][lat]" => "0.0"
+            "[destination_geoip][location][lon]" => "0.0"
+          }
+        }
+        mutate {
+          add_tag => ["_geoip_destination_unresolved"]
+        }
+      }
+
+      if [device_ip] {
+        jdbc_streaming {
+          jdbc_driver_library => "/etc/logstash/jdbc_drivers/postgresql-42.7.5.jar"
+          jdbc_driver_class => "org.postgresql.Driver"
+          jdbc_connection_string => "jdbc:postgresql://127.0.0.1:5432/cm"
+          jdbc_user => "amp_admin"
+          jdbc_password => "Array@123$"
+          statement => "SELECT name, type, device_group FROM device WHERE ip_address = :device_ip"
+          parameters => { "device_ip" => "device_ip" }
+          target => "device_info"
+          tag_on_failure => ["_device_lookup_failure"]
+        }
+
+        if [device_info] {
+          ruby {
+            code => "
+              if event.get('device_info') && event.get('device_info')[0]
+                device = event.get('device_info')[0]
+                event.set('device_name', device['name'])
+                event.set('device_type', device['type'])
+                event.set('device_group', device['device_group'])
+              else
+                event.tag('_device_info_not_found')
+              end
+              event.remove('device_info')
+            "
+          }
+        } else {
+          mutate {
+            add_tag => ["_device_info_not_found"]
+          }
+        }
+      }
+
+      if !("_grokparsefailure_rfc5424" in [tags]) {
+        mutate {
+          remove_tag => ["_grokparsefailure_an_welf_log", "_grokparsefailure_an_welf_log_no_header", "_grokparsefailure_custom_nonstandard", "_grokparsefailure_bsd"]
+          add_tag => ["syslog_parsed", "rfc5424"]
+        }
+      } else if !("_grokparsefailure_an_welf_log" in [tags]) {
+        mutate {
+          remove_tag => ["_grokparsefailure_rfc542
+
+4", "_grokparsefailure_an_welf_log_no_header", "_grokparsefailure_custom_nonstandard", "_grokparsefailure_bsd"]
+          add_tag => ["syslog_parsed", "an_welf_log_headered"]
+        }
+      } else if !("_grokparsefailure_an_welf_log_no_header" in [tags]) {
+        mutate {
+          remove_tag => ["_grokparsefailure_rfc5424", "_grokparsefailure_an_welf_log", "_grokparsefailure_custom_nonstandard", "_grokparsefailure_bsd"]
+          add_tag => ["syslog_parsed", "an_welf_log_no_header"]
+        }
+      } else if !("_grokparsefailure_custom_nonstandard" in [tags]) {
+        mutate {
+          remove_tag => ["_grokparsefailure_rfc5424", "_grokparsefailure_an_welf_log", "_grokparsefailure_an_welf_log_no_header", "_grokparsefailure_bsd"]
+          add_tag => ["syslog_parsed", "custom_nonstandard_log"]
+        }
+      } else if !("_grokparsefailure_bsd" in [tags]) {
+        mutate {
+          remove_tag => ["_grokparsefailure_rfc5424", "_grokparsefailure_an_welf_log", "_grokparsefailure_an_welf_log_no_header", "_grokparsefailure_custom_nonstandard"]
+          add_tag => ["syslog_parsed", "bsd_syslog"]
+        }
+      }
+
+      # Stage 4: Handle Unparsed Logs
+      if "_grokparsefailure_rfc5424" in [tags] and "_grokparsefailure_an_welf_log" in [tags] and "_grokparsefailure_an_welf_log_no_header" in [tags] and "_grokparsefailure_custom_nonstandard" in [tags] and "_grokparsefailure_bsd" in [tags] {
+        mutate {
+          add_tag => ["_parsefailure_syslog_unhandled"]
+          remove_tag => ["_grokparsefailure_rfc5424", "_grokparsefailure_an_welf_log", "_grokparsefailure_an_welf_log_no_header", "_grokparsefailure_custom_nonstandard", "_grokparsefailure_bsd"]
+        }
+      }
+    }
+  }
+}
+
+output {
+  stdout { codec => rubydebug }
+  opensearch {
+    hosts => ["https://127.0.0.1:9200"]
+    index => "acm-%{+YYYY.MM.dd}"
+    user => "logstash"
+    password => "Array@123"
+    ssl => true
+    ssl_certificate_verification => true
+    cacert => "/etc/logstash/certs/root-ca.pem"
+    manage_template => false
+  }
+}
+EOF
+
+sudo chown logstash:logstash "${LOGSTASH_PIPELINE_CONF}"
+sudo chmod 644 "${LOGSTASH_PIPELINE_CONF}"
+log_success "Logstash syslog pipeline configured to write to acm-%{+YYYY.MM.dd} indices. Customize ${LOGSTASH_PIPELINE_CONF} for production (e.g., update jdbc_connection_string, jdbc_user, jdbc_password)."
+
+# 14. Ensure Logstash data directory exists and has correct permissions
+log_info "Configuring Logstash data directory..."
+LOGSTASH_DATA_DIR="/usr/share/logstash/data"
+sudo mkdir -p "${LOGSTASH_DATA_DIR}" || log_error "Failed to create Logstash data directory ${LOGSTASH_DATA_DIR}."
+sudo chown -R logstash:logstash "${LOGSTASH_DATA_DIR}" || log_error "Failed to set ownership for Logstash data directory."
+sudo chmod -R u+rwX "${LOGSTASH_DATA_DIR}" || log_error "Failed to set permissions for Logstash data directory."
+log_success "Logstash data directory configured."
+
+# 15. Configure Logstash systemd service
+log_info "Configuring Logstash systemd service..."
+LOGSTASH_SERVICE_FILE="/usr/lib/systemd/system/${LOGSTASH_SERVICE}.service"
+
+# Backup existing service file if it exists
+if [ -f "${LOGSTASH_SERVICE_FILE}" ]; then
+    sudo cp "${LOGSTASH_SERVICE_FILE}" "${LOGSTASH_SERVICE_FILE}.bak_$(date +%Y%m%d%H%M%S)" || log_warn "Failed to backup Logstash service file."
+fi
+
+cat << EOF | sudo tee "${LOGSTASH_SERVICE_FILE}" > /dev/null
+[Unit]
+Description=Logstash
+Documentation=https://www.elastic.co/guide/en/logstash/current/index.html
+Wants=network-online.target
+After=network-online.target
+
+[Service]
+Type=simple
+User=logstash
+Group=logstash
+Environment="JAVA_HOME=${JAVA_HOME_PATH}"
+Environment="LS_JAVA_OPTS=-Xmx${JAVA_HEAP_SIZE} -Xms${JAVA_HEAP_SIZE}"
+ExecStart=/usr/share/logstash/bin/logstash --path.settings ${LOGSTASH_CONFIG_DIR}
+Restart=on-failure
+StandardOutput=journal
+StandardError=inherit
+SyslogIdentifier=logstash
+LimitNOFILE=65535
+LimitNPROC=4096
+LimitAS=infinity
+LimitFSIZE=infinity
+TimeoutStopSec=0
+KillSignal=SIGTERM
+KillMode=process
+SendSIGKILL=no
+
+[Install]
+WantedBy=multi-user.target
+EOF
+
+sudo chmod 644 "${LOGSTASH_SERVICE_FILE}"
+sudo chown root:root "${LOGSTASH_SERVICE_FILE}"
+log_success "Logstash systemd service configured."
+
+# 16. Start Logstash
+log_info "Enabling and starting Logstash service..."
+sudo systemctl daemon-reload || log_error "Failed to reload systemd daemon."
+sudo systemctl enable "${LOGSTASH_SERVICE}" || log_error "Failed to enable Logstash service."
+sudo systemctl restart "${LOGSTASH_SERVICE}" || log_error "Failed to start Logstash service."
+
+# 17. Wait for Logstash
+log_info "Waiting for Logstash to start..."
+ATTEMPTS=60
+for i in $(seq 1 $ATTEMPTS); do
+    if sudo journalctl -u logstash -n 10 | grep -q "Pipelines running"; then
+        log_success "Logstash service is up!"
+        break
+    else
+        log_info "Attempt $i/$ATTEMPTS: Logstash not ready. Waiting 10 seconds..."
+        sleep 10
+    fi
+    if [ $i -eq $ATTEMPTS ]; then
+        log_error "Logstash did not start. Check logs with 'journalctl -u logstash'."
+    fi
+done
+
+# 18. Configure Firewall
+configure_firewall
+
+# 19. Final Output
+log_success "Logstash 8.15.0 installation and configuration for OpenSearch 3.x complete!"
+log_info "Logstash is configured to process syslog messages on port 5514 (with forwarding from 514) and write to OpenSearch acm-%{+YYYY.MM.dd} indices at ${OPENSEARCH_HOST}."
+log_info "Index pattern 'acm-*' is configured with @timestamp as the time field for OpenSearch Dashboards."
\ No newline at end of file
