博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
基于kafka的日志收集分析平台搭建详解
阅读量:37336 次
发布时间:2020-10-03

本文共 14589 字,大约阅读时间需要 48 分钟。

基于kafka的日志收集分析平台搭建详解

文章目录

整体架构图

主机名 IP 描述
keepalived-1 192.168.175.110 反向代理机1,使用keepalived做高可用
keepalived-2 192.168.175.111 反向代理机2,使用keepalived做高可用
nginx-1 192.168.175.120 运行nginx和filebeat服务
nginx-2 192.168.175.121 运行nginx和filebeat服务
nginx-3 192.168.175.122 运行nginx和filebeat服务
kafka-zp01 192.168.175.133 运行kafka和zookeeper服务
kafka-zp02 192.168.175.134 运行kafka和zookeeper服务
kafka-zp03 192.168.175.135 运行kafka和zookeeper服务

准备工作

1、准备好8台虚拟机(centos7 2核1G )
2、全部配置好对应的静态IP地址修改好主机名
  • 修改主机名

    [root@localhost ~]# hostnamectl set-hostname nginx-1[root@localhost ~]# su	#重新登录[root@nginx-1 ~]#
  • 配置静态IP地址

    修改/etc/sysconfig/network-scripts/ifcfg-ens33,以nginx-1主机为例

    [root@nginx-1 ~]# vim /etc/sysconfig/network-scripts/ifcfg-ens33TYPE=EthernetPROXY_METHOD=noneBROWSER_ONLY=noBOOTPROTO=static	#设置静态IPDEFROUTE=yesIPV4_FAILURE_FATAL=noIPV6INIT=yesIPV6_AUTOCONF=yesIPV6_DEFROUTE=yesIPV6_FAILURE_FATAL=noIPV6_ADDR_GEN_MODE=stable-privacyNAME=ens33UUID=933b7c00-91f4-4ad1-911d-d913192d3161DEVICE=ens33ONBOOT=yesIPADDR=192.168.175.135	#IP地址GATEWAY=192.168.175.2	#网关DNS1=114.114.114.114	#DNSPREFIX=24

    重启network服务

    [root@nginx-1 ~]# service network restart
3、关闭防火墙和selinux服务
  • 关闭防火墙服务

    systemctl stop firewalldsystemctl disable firewalld
  • 关闭selinux服务

    临时修改

    setenforce 0

    永久修改

    vim /etc/selinux/config#将SELINUX由enforce修改为disabledSELINUX=disabled
4、所有机器配置好对kafka、zookeeper集群的机器的域名解析
192.168.0.94  kafka-zp01192.168.0.95  kafka-zp02192.168.0.96  kafka-zp03

搭建keepalived双VIP高可用

执行机器:keepalived-1、keepalived-2

1、安装keepalived
yum install keepalived -y
2、配置keepalived
  • 配置keepalived-1为VIP1:192.168.175.188的master,为VIP2:192.168.175.198的backup

    [root@keepalived-1 ~]# vim /etc/keepalived/keepalived.confvrrp_instance VI_1 {
    state MASTER interface ens33 virtual_router_id 51 #虚拟路由id 在同一个局域网内来区分不同的keepalived集群,如>果在同一个keepalived集群中,那每台主机的router id都是一样的 priority 120 advert_int 1 authentication {
    auth_type PASS auth_pass 1111 } virtual_ipaddress {
    192.168.175.188 }}vrrp_instance VI_2 {
    state BACKUP interface ens33 virtual_router_id 52 priority 100 advert_int 1 authentication {
    auth_type PASS auth_pass 1111 } virtual_ipaddress {
    192.168.175.198 }}
  • 配置keepalived-2为VIP2:192.168.175.198的master,为VIP1:192.168.175.188的backup

    [root@keepalived-2 ~]# vim /etc/keepalived/keepalived.confvrrp_instance VI_1 {
    state BACKUP interface ens33 virtual_router_id 51 #虚拟路由id 在同一个局域网内来区分不同的keepalived集群,如>果在同一个keepalived集群中,那每台主机的router id都是一样的 priority 100 advert_int 1 authentication {
    auth_type PASS auth_pass 1111 } virtual_ipaddress {
    192.168.175.188 }}vrrp_instance VI_2 {
    state MASTER interface ens33 virtual_router_id 52 priority 120 advert_int 1 authentication {
    auth_type PASS auth_pass 1111 } virtual_ipaddress {
    192.168.175.198 }}
3、重启keepalived服务
systemctl restart keepalived
4、查看效果

使用ip add命令查看VIP的归属

[root@keepalived-1 ~]# ip add1: lo: 
mtu 65536 qdisc noqueue state UNKNOWN group default qlen 1000 link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00 inet 127.0.0.1/8 scope host lo valid_lft forever preferred_lft forever inet6 ::1/128 scope host valid_lft forever preferred_lft forever2: ens33:
mtu 1500 qdisc pfifo_fast state UNKNOWN group default qlen 1000 link/ether 00:0c:29:0b:4e:b7 brd ff:ff:ff:ff:ff:ff inet 192.168.175.110/24 brd 192.168.175.255 scope global dynamic ens33 valid_lft 1510sec preferred_lft 1510sec inet 192.168.175.188/32 scope global ens33 valid_lft forever preferred_lft forever[root@keepalived-2 ~]# ip add1: lo:
mtu 65536 qdisc noqueue state UNKNOWN group default qlen 1000 link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00 inet 127.0.0.1/8 scope host lo valid_lft forever preferred_lft forever inet6 ::1/128 scope host valid_lft forever preferred_lft forever2: ens33:
mtu 1500 qdisc pfifo_fast state UNKNOWN group default qlen 1000 link/ether 00:0c:29:ff:e9:c1 brd ff:ff:ff:ff:ff:ff inet 192.168.175.111/24 brd 192.168.175.255 scope global dynamic ens33 valid_lft 1510sec preferred_lft 1510sec inet 192.168.175.198/32 scope global ens33 valid_lft forever preferred_lft forever

搭建nginx集群

执行机器:nginx-1、nginx-2、nginx-3

1、安装nginx
yum install epel-release -y		#安装epel扩展源yum install  nginx -y
2、启动nginx并设置开机自启
systemctl start nginx	#启动nginxsystemctl enable nginx	#设置开机自启
3、编辑配置文件
[root@nginx-1 ~]# cd /etc/nginx/[root@nginx-1 nginx]# lsconf.d        fastcgi.conf.default    koi-utf     mime.types.default  scgi_params          uwsgi_params.defaultdefault.d     fastcgi_params          koi-win     nginx.conf          scgi_params.default  win-utffastcgi.conf  fastcgi_params.default  mime.types  nginx.conf.default  uwsgi_params

主配置文件: nginx.conf

...              #全局块events {
#events块 ...}http #http块{
... #http全局块 server #server块 {
... #server全局块 location [PATTERN] #location块 {
... } location [PATTERN] {
... } } server {
... } ... #http全局块}

1、全局块:配置影响nginx全局的指令。一般有运行nginx服务器的用户组,nginx进程pid存放路径,日志存放路径,配置文件引入,允许生成worker process数等。

2、events块:配置影响nginx服务器或与用户的网络连接。有每个进程的最大连接数,选取哪种事件驱动模型处理连接请求,是否允许同时接受多个网路连接,开启多个网络连接序列化等。

3、http块:可以嵌套多个server,配置代理,缓存,日志定义等绝大多数功能和第三方模块的配置。如文件引入,mime-type定义,日志自定义,是否使用sendfile传输文件,连接超时时间,单连接请求数等。

4、server块:配置虚拟主机的相关参数,一个http中可以有多个server。

5、location块:配置请求的路由,以及各种页面的处理情况

修改配置文件,在nginx.conf中的http全局块中添加include /etc/nginx/conf.d/*.conf;

在/etc/nginx下新建conf.d目录,在此目录下添加sc.conf配置文件,信息如下

vim  /etc/nginx/conf.d/sc.confserver {
listen 80 default_server; server_name www.sc.com; root /usr/share/nginx/html; #设置根目录 access_log /var/log/nginx/sc/access.log main; #设置日志存储目录 日志格式采用主配置文件里的main格式 location / {
}}

使用nginx -t对配置进行语法检测

[root@nginx-1 html]# nginx -tnginx: the configuration file /etc/nginx/nginx.conf syntax is oknginx: [emerg] open() "/var/log/nginx/sc/access.log" failed (2: No such file or directory)nginx: configuration file /etc/nginx/nginx.conf test failed[root@nginx-1 html]# mkdir /var/log/nginx/sc[root@nginx-1 html]# nginx -tnginx: the configuration file /etc/nginx/nginx.conf syntax is oknginx: configuration file /etc/nginx/nginx.conf test is successful

重新加载nginx

nginx -s  reload

nginx反向代理配置

执行机器:keepalived-1、keepalived-2

配置keepalived-1、keepalived-2 nginx服务反向代理到nginx集群

1、安装nginx
yum install epel-release -y		#安装epel扩展源yum install  nginx -y
2、启动nginx并设置开机自启
systemctl start nginx	#启动nginxsystemctl enable nginx	#设置开机自启
3、修改配置
  • 修改/etc/nginx/nginx.conf,在http全局块中添加include /etc/nginx/conf.d/*.conf;

  • 在/etc/nginx下新建conf.d目录,在此目录下添加sc.conf配置文件,信息如下

    upstream nginx_backend {
    server 192.168.175.120:80; #nginx-1 server 192.168.175.121:80; #nginx-2 server 192.168.175.122:80; #nginx-3}server {
    listen 80 default_server; root /usr/share/nginx/html; location / {
    proxy_pass http://nginx_backend; }}
  • 使用nginx -t对配置进行语法检测,检测无误后使用nginx -s reload对nginx服务进行重新加载

4、效果

访问VIP1:192.168.175.188和VIP2:192.168.175.198时自动代理访问到后端nginx集群。

搭建kafka集群和zookeeper集群

执行机器:kafka-zp01、kafka-zp02、kafka-zp03

1、安装

安装java:

yum install java wget -y

安装kafka:

wget http://mirrors.aliyun.com/apache/kafka/2.8.2/kafka_2.12-2.8.2.tgz

解包:

tar xf kafka_2.12-2.8.1.tgz

安装zookeeper:

wget https://mirrors.bfsu.edu.cn/apache/zookeeper/zookeeper-3.6.3/apache-zookeeper-3.6.3-bin.tar.gz

解包:

tar xf apache-zookeeper-3.6.3-bin.tar.gz
2、配置kafka

修改kafka_2.12-2.8.1/config /server.properties:

broker.id=0listeners=PLAINTEXT://kafka-zp01:9092zookeeper.connect=192.168.175.133:2181,192.168.175.134:2181,192.168.175.135:2181

kafka的日志存储在kafka目录下的logs中,可以按照两个维度来设置清除

1、按时间 7天
2、按大小

任意一个按时间或者按大小的条件满足,都可以触发日志清理

kafka日志保存是按段保存的,segment

假设有如下segment
00.log 11.log 22.log

00.log保存的是第一条到11条的日志

11.log保存的是第12条到第22条的日志
22.log保存的是第22条之后的日志

3、配置zk

进入apache-zookeeper-3.6.3-bin/conf

cp zoo_sample.cfg zoo.cfg

修改zoo.cfg, 添加如下三行:

server.1=192.168.175.133:3888:4888server.2=192.168.175.134:3888:4888server.3=192.168.175.135:3888:4888

3888和4888都是端口 一个用于数据传输,一个用于检验存活性和选举

创建/tmp/zookeeper目录 ,在目录中添加myid文件,文件内容就是本机指定的zookeeper id内容

如:在192.168.175.133机器上

echo 1 > /tmp/zookeeper/myid

启动zookeeper:

apache-zookeeper-3.6.3-bin/bin/zkServer.sh start

开启zk和kafka的时候,一定是先启动zk,再启动kafka

关闭服务的时候,kafka先关闭,再关闭zk

查看

[root@kafka-zp03 apache-zookeeper-3.6.3-bin]# bin/zkServer.sh status/usr/bin/javaZooKeeper JMX enabled by defaultUsing config: /opt/apache-zookeeper-3.6.3-bin/bin/../conf/zoo.cfgClient port found: 2181. Client address: localhost. Client SSL: false.Mode: leader

启动kafka:

bin/kafka-server-start.sh -daemon config/server.properties

zookeeper使用:

zookeeper 分布式,开源的配置管理服务 etcd

  • 运行

    bin/zkCli.sh[zk: localhost:2181(CONNECTED) 1] ls /[admin, brokers, cluster, config, consumers, controller, controller_epoch, feature, isr_change_notification, latest_producer_id_block, log_dir_event_notification, sc, zookeeper][zk: localhost:2181(CONNECTED) 2] ls /brokers/ids[1, 2, 3][zk: localhost:2181(CONNECTED) 3] create /sc/yyCreated /sc/yy[zk: localhost:2181(CONNECTED) 4] ls /sc[page, xx, yy][zk: localhost:2181(CONNECTED) 5] set /sc/yy 90[zk: localhost:2181(CONNECTED) 6] get /sc/yy90

测试

1、创建topic
[root@kafka-zp01 kafka_2.12-2.8.1]# bin/kafka-topics.sh --create --zookeeper 192.168.175.133:2181 --replication-factor 1 --partitions 1 --topic sc
2、查看topic
[root@kafka-zp01 kafka_2.12-2.8.1]# bin/kafka-topics.sh --list --zookeeper 192.168.175.133:2181--topic pxb
3、创建生产者
[root@kafka-zp01 kafka_2.12-2.8.1]# bin/kafka-console-producer.sh --broker-list 192.168.175.133:9092 --topic pxb>sdajagd>adajg>adadw
4、创建消费者
[root@kafka-zp02 kafka_2.12-2.8.1]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.175.133:9092 --topic pxb --from-beginning

此时消费者会输出生产者输入的数据,表明测试成功。如下图所示

部署filebeat集群

执行机器:nginx-1、nginx-2、nginx-3

1、安装
  • 下载资源包
rpm --import https://packages.elastic.co/GPG-KEY-elasticsearch
  • 编辑 vim /etc/yum.repos.d/fb.repo
[elastic-7.x]name=Elastic repository for 7.x packagesbaseurl=https://artifacts.elastic.co/packages/7.x/yumgpgcheck=1gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearchenabled=1autorefresh=1type=rpm-md
  • yum安装
yum  install  filebeat -yrpm -qa  |grep filebeat  #可以查看filebeat有没有安装  rpm -qa 是查看机器上安装的所有软件包rpm -ql  filebeat  #查看filebeat安装到哪里去了,牵扯的文件有哪些
  • 设置开机自启
systemctl enable filebeat

#ymal格式

{
“filebeat.inputs”: [
{ “type”:“log”,
“enabled”:true,
“paths”:[“/var/log/nginx/sc_access”]
},
],

}

2、配置

修改配置文件/etc/filebeat/filebeat.yml

filebeat.inputs:- type: log # Change to true to enable this input configuration.  enabled: true # Paths that should be crawled and fetched. Glob based paths.  paths:    - /var/log/nginx/sc/access.log #==========------------------------------kafka-----------------------------------output.kafka:   hosts: ["192.168.175.133:9092","192.168.175.134:9092","192.168.175.135:9092"]   topic: nginxlog   keep_alive: 10s
3、创建主题nginxlog
bin/kafka-topics.sh --create --zookeeper 192.168.175.133:2181 --replication-factor 3 --partitions 1 --topic nginxlog
4、启动服务
systemctl start  filebeat

访问nginx集群生成日志

执行机器:kafka-zp01

1、在/etc/hosts下添加域名解析,将www.pxb.com网站解析成VIP1和VIP2

vim /etc/hosts

192.168.175.188	www.pxb.com192.168.175.198	www.pxb.com
2、访问www.pxb.com产生日志
curl  www.pxb.com
3、创建消费者获取日志
[root@kafka-zp01 kafka_2.12-2.8.1]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.175.134:9092 --topic nginxlog --from-beginning

数据入库

1、需求分析

需要nginx日志的ip,时间,带宽字段

将ip字段解析成相应的省份、运营商
存入数据库的字段: id, 时间, 省份, 运营商, 带宽

2、步骤
  • 创建数据表
  • 编写python脚本, 从kafka获取nginx日志
  • 获取nginx日志,提取出ip,时间,带宽字段
  • 提取出的ip字段通过淘宝的一个接口解析出省份和运营商
    url = “https://ip.taobao.com/outGetIpInfo?accessKey=alibaba-inc&ip=114.114.114.114”
  • 格式化时间字段 “2021-10-12 12:00:00”
  • 存入数据库
3、实施

建表

CREATE TABLE `log` (  `ip` varchar(50) DEFAULT NULL,  `time` datetime DEFAULT NULL,  `dk` varchar(30) DEFAULT NULL,  `isp` varchar(50) DEFAULT NULL,  `prov` varchar(30) DEFAULT NULL) ENGINE=InnoDB DEFAULT CHARSET=utf8;

编写python脚本 vim python_consumer.py

#!/bin/python3#encoding:utf-8import jsonimport requestsimport timeimport pymysql#连接数据库db = pymysql.connect(  host = "192.168.175.135",  user = "root",  passwd = "123456",  database = "nginx_log")cursor = db.cursor()taobao_url = "https://ip.taobao.com/outGetIpInfo?accessKey=alibaba-inc&ip="#查询ip地址的信息(省份和运营商isp),通过taobao网的接口def resolv_ip(ip):    response = requests.get(taobao_url+ip)    if response.status_code == 200:       tmp_dict = json.loads(response.text)       prov = tmp_dict["data"]["region"]       isp = tmp_dict["data"]["isp"]       return prov,isp    return None,None#将日志里读取的格式转换为我们指定的格式def trans_time(dt):     #把字符串转成时间格式    timeArray = time.strptime(dt, "%d/%b/%Y:%H:%M:%S")    #timeStamp = int(time.mktime(timeArray))    #把时间格式转成字符串    new_time = time.strftime("%Y-%m-%d %H:%M:%S", timeArray)        return new_time#从kafka里获取数据,清洗为我们需要的ip,时间,带宽from pykafka import KafkaClientclient = KafkaClient(hosts="192.168.175.133:9092,192.168.175.134:9092,192.168.175.135:9092")topic = client.topics['nginxlog'] balanced_consumer = topic.get_balanced_consumer(  consumer_group = 'testgroup',  auto_commit_enable = True,      zookeeper_connect = '192.168.175.133:2181,192.168.175.134:2181,192.168.175.135:2181') #consumer = topic.get_simple_consumer() for message in balanced_consumer:   if message is not None:        line = json.loads(message.value.decode("utf-8"))       log = line["message"]       tmp_lst = log.split()       ip = tmp_lst[0]       dt = tmp_lst[3].replace("[","")       bt = tmp_lst[9]       dt = trans_time(dt)       prov, isp = resolv_ip(ip)       insert = "insert into log(ip, time, dk, prov, isp) values( %s, %s, %s, %s, %s)"       values = (ip, dt, bt, prov,isp)       cursor.execute(insert,values)       db.commit()       if prov and isp:          print(prov, isp,dt)db.close()
4、执行效果

运行python脚本

查看数据库

问题

遇到的问题:

重启kafka服务之后运行zookeeper查看brokers/ids显示为空列表

解决:

查看kafka目录下的logs/server.log日志

kafka.common.InconsistentClusterIdException: The Cluster ID yjMbix25TJ6VpXSNceA72w doesn't match stored clusterId Some(yqcsYK4cSJKkhfu6kR0_Yw) in meta.properties. The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong.	at kafka.server.KafkaServer.startup(KafkaServer.scala:252)	at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)	at kafka.Kafka$.main(Kafka.scala:82)	at kafka.Kafka.main(Kafka.scala)

修改kafka数据存放目录下的meta.properties里的参数,将clusterid设置成yjMbix25TJ6VpXSNceA72w之后再重启kafka服务。

转载地址:http://eihnwy.baihongyu.com/

你可能感兴趣的文章
黄家懿:河北高校邀请赛 -- 二手车交易价格预测决赛答辩
查看>>
如何利用pyecharts绘制酷炫的桑基图?
查看>>
王朝阳:河北高校邀请赛 -- 二手车交易价格预测决赛答辩
查看>>
Scratch等级考试(二级)模拟题
查看>>
如何在Jupyter Lab中显示pyecharts的图形?
查看>>
什么是Python之禅?
查看>>
【青少年编程】【Scratch】01 运动模块
查看>>
json的序列化与反序列化
查看>>
【第16周复盘】学习的飞轮
查看>>
如何利用pyecharts绘制炫酷的关系网络图?
查看>>
NCEPU:线下组队学习周报(007)
查看>>
【青少年编程】【二级】寻找宝石
查看>>
【组队学习】【26期】Linux教程
查看>>
解决 nginx: [error] open() “/usr/local/nginx/logs/nginx.pid“ failed (2: No such file or directory) 问题
查看>>
LeetCode-122. 买卖股票的最佳时机 II(Goland实现)
查看>>
LeetCode-136. 只出现一次的数字(Goland实现)
查看>>
go-递归实现二叉树的三种排序方式(前序、中序、后序)【详细】
查看>>
LeetCode-LCP 18. 早餐组合(Goland实现)
查看>>
C++从入门到进阶近100本书推荐电子书pdf
查看>>
蓝桥杯 - [2014年第五届真题]分糖果(模拟)
查看>>