Logstash 采集 Nginx 日志入库

使用 Logstash 抓取官网 Nginx access.log 并写入 SR 数据库


架构概览

Nginx Access Log → Logstash → SR Database

流程说明

  1. Nginx 生成访问日志 (access.log)
  2. Logstash 读取并解析日志
  3. 数据经过过滤和转换
  4. 写入到 SR 数据库

一、Nginx 日志格式

标准日志格式

log_format main '$remote_addr - $remote_user [$time_local] "$request" '
                '$status $body_bytes_sent "$http_referer" '
                '"$http_user_agent" "$http_x_forwarded_for"';

日志示例

192.168.1.100 - - [19/Dec/2025:10:30:45 +0800] "GET /api/books HTTP/1.1" 200 1234 "https://example.com" "Mozilla/5.0" "-"

字段说明

字段说明示例
$remote_addr客户端 IP192.168.1.100
$remote_user认证用户-
$time_local本地时间19/Dec/2025:10:30:45 +0800
$request请求方法和 URIGET /api/books HTTP/1.1
$statusHTTP 状态码200
$body_bytes_sent响应体大小(字节)1234
$http_referer来源页面https://example.com
$http_user_agent用户代理Mozilla/5.0
$http_x_forwarded_for真实 IP(代理)-

二、Logstash 配置

完整配置文件

创建配置文件:nginx-to-sr.conf

input {
  file {
    path => "/var/log/nginx/access.log"
    # Windows 路径示例:path => "D:/nginx/logs/access.log"
 
    start_position => "beginning"
    sincedb_path => "/var/lib/logstash/sincedb_nginx"
    # Windows 路径示例:sincedb_path => "D:/logstash/sincedb/nginx"
 
    codec => "plain"
    type => "nginx_access"
  }
}
 
filter {
  # 解析 Nginx 日志
  grok {
    match => {
      "message" => '%{IPORHOST:remote_addr} - %{DATA:remote_user} \[%{HTTPDATE:time_local}\] "%{WORD:request_method} %{DATA:request_uri} HTTP/%{NUMBER:http_version}" %{NUMBER:status} %{NUMBER:body_bytes_sent} "%{DATA:http_referer}" "%{DATA:http_user_agent}" "%{DATA:http_x_forwarded_for}"'
    }
  }
 
  # 解析时间
  date {
    match => [ "time_local", "dd/MMM/yyyy:HH:mm:ss Z" ]
    target => "@timestamp"
  }
 
  # 提取请求路径参数
  if [request_uri] {
    grok {
      match => { "request_uri" => "%{URIPATHPARAM:uri_path}" }
    }
  }
 
  # 类型转换
  mutate {
    convert => {
      "status" => "integer"
      "body_bytes_sent" => "integer"
    }
  }
 
  # 添加自定义字段
  mutate {
    add_field => {
      "log_source" => "official_website"
      "environment" => "production"
    }
  }
 
  # 过滤掉健康检查请求(可选)
  if [request_uri] =~ /health|ping/ {
    drop { }
  }
 
  # 移除不需要的字段
  mutate {
    remove_field => [ "message", "host", "path" ]
  }
}
 
output {
  # 输出到 SR 数据库
  jdbc {
    driver_class => "com.mysql.cj.jdbc.Driver"
    # SQL Server: driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
 
    connection_string => "jdbc:mysql://sr-host:3306/logs_db?useSSL=false&serverTimezone=UTC"
    # SQL Server: connection_string => "jdbc:sqlserver://sr-host:1433;databaseName=logs_db"
 
    username => "logstash_user"
    password => "$PASSWORD"
 
    statement => [
      "INSERT INTO nginx_access_logs (
        remote_addr, remote_user, time_local, request_method, request_uri,
        http_version, status, body_bytes_sent, http_referer, http_user_agent,
        http_x_forwarded_for, log_source, environment, created_at
      ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NOW())",
      "remote_addr", "remote_user", "time_local", "request_method", "request_uri",
      "http_version", "status", "body_bytes_sent", "http_referer", "http_user_agent",
      "http_x_forwarded_for", "log_source", "environment"
    ]
  }
 
  # 调试输出(可选)
  # stdout {
  #   codec => rubydebug
  # }
}

三、SR 数据库表结构

建表 SQL

CREATE TABLE nginx_access_logs (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    remote_addr VARCHAR(50) NOT NULL,
    remote_user VARCHAR(50) DEFAULT NULL,
    time_local DATETIME NOT NULL,
    request_method VARCHAR(10) NOT NULL,
    request_uri TEXT NOT NULL,
    http_version VARCHAR(10) NOT NULL,
    status INT NOT NULL,
    body_bytes_sent INT DEFAULT 0,
    http_referer TEXT,
    http_user_agent TEXT,
    http_x_forwarded_for VARCHAR(50) DEFAULT NULL,
    log_source VARCHAR(50) DEFAULT 'official_website',
    environment VARCHAR(20) DEFAULT 'production',
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
    INDEX idx_remote_addr (remote_addr),
    INDEX idx_time_local (time_local),
    INDEX idx_status (status),
    INDEX idx_created_at (created_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='Nginx 访问日志';

索引优化建议

-- 按日期分区(可选,适合大数据量)
ALTER TABLE nginx_access_logs
PARTITION BY RANGE (TO_DAYS(time_local)) (
    PARTITION p202512 VALUES LESS THAN (TO_DAYS('2026-01-01')),
    PARTITION p202601 VALUES LESS THAN (TO_DAYS('2026-02-01')),
    PARTITION p_future VALUES LESS THAN MAXVALUE
);
 
-- 复合索引(根据查询需求)
CREATE INDEX idx_time_status ON nginx_access_logs(time_local, status);
CREATE INDEX idx_uri_path ON nginx_access_logs(request_uri(100));

四、部署与运行

1. 安装 Logstash

# 使用 Docker
docker pull docker.elastic.co/logstash/logstash:8.11.0
 
# 或手动安装
wget https://artifacts.elastic.co/downloads/logstash/logstash-8.11.0-linux-x86_64.tar.gz
tar -xzf logstash-8.11.0-linux-x86_64.tar.gz

2. 添加 JDBC 驱动

# MySQL
wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.33/mysql-connector-java-8.0.33.jar
cp mysql-connector-java-8.0.33.jar /usr/share/logstash/logstash-core/lib/jars/
 
# SQL Server
wget https://repo1.maven.org/maven2/com/microsoft/sqlserver/mssql-jdbc/12.2.0.jre11/mssql-jdbc-12.2.0.jre11.jar
cp mssql-jdbc-12.2.0.jre11.jar /usr/share/logstash/logstash-core/lib/jars/

3. 测试配置

# 验证配置文件语法
/usr/share/logstash/bin/logstash -f nginx-to-sr.conf --config.test_and_exit
 
# 查看详细输出
/usr/share/logstash/bin/logstash -f nginx-to-sr.conf --config.reload.automatic

4. 启动 Logstash

# 前台运行(调试)
/usr/share/logstash/bin/logstash -f nginx-to-sr.conf
 
# 后台运行
nohup /usr/share/logstash/bin/logstash -f nginx-to-sr.conf > /var/log/logstash.log 2>&1 &
 
# 使用 systemd(推荐)
sudo systemctl start logstash
sudo systemctl enable logstash

5. Docker 部署

docker run -d \
  --name logstash-nginx \
  -v /path/to/nginx-to-sr.conf:/usr/share/logstash/pipeline/logstash.conf \
  -v /var/log/nginx:/var/log/nginx:ro \
  -v /path/to/mysql-connector.jar:/usr/share/logstash/logstash-core/lib/jars/mysql-connector.jar \
  docker.elastic.co/logstash/logstash:8.11.0

五、监控与优化

监控脚本

#!/bin/bash
# check_logstash.sh
 
# 检查进程
if ! pgrep -f logstash > /dev/null; then
    echo "Logstash is not running!"
    exit 1
fi
 
# 检查日志延迟
LAST_LOG=$(tail -1 /var/log/nginx/access.log | awk '{print $4}' | tr -d '[')
CURRENT_TIME=$(date +"%d/%b/%Y:%H:%M:%S")
 
echo "Last log: $LAST_LOG"
echo "Current time: $CURRENT_TIME"
 
# 检查数据库写入
DB_COUNT=$(mysql -h sr-host -u user -ppass -D logs_db -se "SELECT COUNT(*) FROM nginx_access_logs WHERE created_at > DATE_SUB(NOW(), INTERVAL 5 MINUTE)")
echo "Recent records: $DB_COUNT"

性能优化

1. 批量写入

output {
  jdbc {
    # 启用批量插入
    statement => "INSERT INTO nginx_access_logs (...) VALUES (...)"
    flush_size => 1000
    idle_flush_time => 10
  }
}

2. 多线程处理

input {
  file {
    path => "/var/log/nginx/access.log"
  }
}
 
filter {
  # ... 过滤器配置
}
 
# 添加管道配置
# 编辑 logstash.yml
# pipeline.workers: 4
# pipeline.batch.size: 1000

3. 日志轮转配置

# /etc/logrotate.d/nginx
/var/log/nginx/*.log {
    daily
    rotate 7
    missingok
    compress
    delaycompress
    notifempty
    create 0640 nginx nginx
    sharedscripts
    postrotate
        [ -f /var/run/nginx.pid ] && kill -USR1 `cat /var/run/nginx.pid`
    endscript
}

六、常见问题

Q1: Logstash 无法解析日志格式

解决方案

# 1. 测试 grok 模式
cd /usr/share/logstash
bin/logstash -e '
input { stdin { } }
filter {
  grok {
    match => { "message" => "你的 grok 模式" }
  }
}
output { stdout { codec => rubydebug } }
'
 
# 2. 使用 Grok Debugger
# https://grokdebug.herokuapp.com/

Q2: JDBC 连接失败

检查清单

  • JDBC 驱动是否正确放置
  • 数据库地址和端口是否正确
  • 用户名密码是否正确
  • 数据库用户是否有 INSERT 权限
  • 防火墙是否开放端口
-- 检查用户权限
SHOW GRANTS FOR 'logstash_user'@'%';
 
-- 授权(如需要)
GRANT INSERT, SELECT ON logs_db.* TO 'logstash_user'@'%';
FLUSH PRIVILEGES;

Q3: 数据重复插入

原因:sincedb 文件丢失或重置

解决方案

input {
  file {
    path => "/var/log/nginx/access.log"
    # 指定固定的 sincedb 路径
    sincedb_path => "/var/lib/logstash/sincedb/nginx_access"
    # 从最后位置开始读取(而不是 beginning)
    start_position => "end"
  }
}

Q4: 性能瓶颈

优化方案

  1. 增加 pipeline workers
  2. 启用批量写入
  3. 添加数据库索引
  4. 使用 Redis 作为缓冲队列
# 添加 Redis 缓冲
input {
  redis {
    host => "redis-host"
    port => 6379
    data_type => "list"
    key => "logstash:nginx"
  }
}

Q5: 日志文件轮转后无法读取

解决方案

input {
  file {
    path => "/var/log/nginx/access.log*"
    exclude => "*.gz"
    # 自动发现新文件
    discover_interval => 15
  }
}

七、数据分析查询

常用统计 SQL

-- 按小时统计访问量
SELECT
    DATE_FORMAT(time_local, '%Y-%m-%d %H:00:00') AS hour,
    COUNT(*) AS request_count,
    SUM(body_bytes_sent) AS total_bytes
FROM nginx_access_logs
WHERE time_local >= DATE_SUB(NOW(), INTERVAL 24 HOUR)
GROUP BY hour
ORDER BY hour DESC;
 
-- Top 10 访问 IP
SELECT
    remote_addr,
    COUNT(*) AS request_count
FROM nginx_access_logs
WHERE time_local >= DATE_SUB(NOW(), INTERVAL 24 HOUR)
GROUP BY remote_addr
ORDER BY request_count DESC
LIMIT 10;
 
-- HTTP 状态码分布
SELECT
    status,
    COUNT(*) AS count,
    ROUND(COUNT(*) * 100.0 / (SELECT COUNT(*) FROM nginx_access_logs WHERE time_local >= DATE_SUB(NOW(), INTERVAL 24 HOUR)), 2) AS percentage
FROM nginx_access_logs
WHERE time_local >= DATE_SUB(NOW(), INTERVAL 24 HOUR)
GROUP BY status
ORDER BY count DESC;
 
-- 热门 API 端点
SELECT
    request_uri,
    COUNT(*) AS request_count,
    AVG(body_bytes_sent) AS avg_response_size
FROM nginx_access_logs
WHERE time_local >= DATE_SUB(NOW(), INTERVAL 1 HOUR)
    AND request_method = 'GET'
GROUP BY request_uri
ORDER BY request_count DESC
LIMIT 20;
 
-- 慢响应检测(需要在 Nginx 日志中添加响应时间)
SELECT
    remote_addr,
    request_uri,
    status,
    time_local
FROM nginx_access_logs
WHERE time_local >= DATE_SUB(NOW(), INTERVAL 1 HOUR)
    AND status >= 500
ORDER BY time_local DESC;

八、告警配置

异常状态码告警

-- 创建告警视图
CREATE VIEW v_error_alerts AS
SELECT
    DATE_FORMAT(time_local, '%Y-%m-%d %H:%i:00') AS alert_time,
    COUNT(*) AS error_count,
    GROUP_CONCAT(DISTINCT status) AS status_codes
FROM nginx_access_logs
WHERE time_local >= DATE_SUB(NOW(), INTERVAL 5 MINUTE)
    AND status >= 500
GROUP BY alert_time
HAVING error_count > 10;

监控脚本(配合 crontab)

#!/bin/bash
# alert_nginx_errors.sh
 
ERROR_COUNT=$(mysql -h sr-host -u user -ppass -D logs_db -se "
    SELECT COUNT(*)
    FROM nginx_access_logs
    WHERE time_local >= DATE_SUB(NOW(), INTERVAL 5 MINUTE)
        AND status >= 500
")
 
if [ "$ERROR_COUNT" -gt 10 ]; then
    # 发送告警(可以接入钉钉、企业微信等)
    curl -X POST https://your-alert-webhook \
        -H 'Content-Type: application/json' \
        -d "{\"msg\": \"Nginx error count: $ERROR_COUNT in last 5 minutes\"}"
fi

九、快速启动命令

PowerShell 函数(添加到 Profile)

# 查看 Logstash 状态
function k9s-logstash-status {
    docker ps | grep logstash
}
 
# 查看 Logstash 日志
function k9s-logstash-logs {
    docker logs -f logstash-nginx
}
 
# 重启 Logstash
function k9s-logstash-restart {
    docker restart logstash-nginx
}
 
# 查看最近的访问日志(从数据库)
function k9s-nginx-logs {
    mysql -h sr-host -u user -ppass -D logs_db -e "
        SELECT time_local, remote_addr, request_method, request_uri, status
        FROM nginx_access_logs
        ORDER BY time_local DESC
        LIMIT 20
    "
}

参考:PowerShell-效率配置指南


相关笔记


参考资源


最后更新: 2025-12-19