分类目录归档:大数据

docker-hive的操作验试

1.下载docker镜像库:https://github.com/big-data-europe/docker-hive.git,并安装它。
2.修改其docker-compose.yml文件,为每个容器增加上映射。

version: "3"

services:
  namenode:
    image: bde2020/hadoop-namenode:2.0.0-hadoop2.7.4-java8
    volumes:
      - /data/namenode:/hadoop/dfs/name
      - /data/tools:/tools
    environment:
      - CLUSTER_NAME=test
    env_file:
      - ./hadoop-hive.env
    ports:
      - "50070:50070"
  datanode:
    image: bde2020/hadoop-datanode:2.0.0-hadoop2.7.4-java8
    volumes:
      - /data/datanode:/hadoop/dfs/data
      - /data/tools:/tools
    env_file:
      - ./hadoop-hive.env
    environment:
      SERVICE_PRECONDITION: "namenode:50070"
    ports:
      - "50075:50075"
  hive-server:
    image: bde2020/hive:2.3.2-postgresql-metastore
    volumes:
      - /data/tools:/tools
    env_file:
      - ./hadoop-hive.env
    environment:
      HIVE_CORE_CONF_javax_jdo_option_ConnectionURL: "jdbc:postgresql://hive-metastore/metastore"
      SERVICE_PRECONDITION: "hive-metastore:9083"
    ports:
      - "10000:10000"
  hive-metastore:
    image: bde2020/hive:2.3.2-postgresql-metastore
    volumes:
      - /data/tools:/tools
    env_file:
      - ./hadoop-hive.env
    command: /opt/hive/bin/hive --service metastore
    environment:
      SERVICE_PRECONDITION: "namenode:50070 datanode:50075 hive-metastore-postgresql:5432"
    ports:
      - "9083:9083"
  hive-metastore-postgresql:
    image: bde2020/hive-metastore-postgresql:2.3.0
    volumes:
      - /data/tools:/tools

  presto-coordinator:
    image: shawnzhu/prestodb:0.181
    volumes:
      - /data/tools:/tools
    ports:
      - "8080:8080"

2.创建测试文本

1,xiaoming,book-TV-code,beijing:chaoyang-shagnhai:pudong
2,lilei,book-code,nanjing:jiangning-taiwan:taibei
3,lihua,music-book,heilongjiang:haerbin
3,lihua,music-book,heilongjiang2:haerbin2
3,lihua,music-book,heilongjiang3:haerbin3

3.启动并连接HIVE服务。

docker-compose up -d
docker-compose exec hive-server bash
/opt/hive/bin/beeline -u jdbc:hive2://localhost:10000


4.创建外部表

create external table t2(
    id      int
   ,name    string
   ,hobby   array
   ,add     map
)
row format delimited
fields terminated by ','
collection items terminated by '-'
map keys terminated by ':'
location '/user/t2'


5.文件上传到上步骤中的目录内。
方法1:在HIVE的beeline终端中采用:
load data local inpath ‘/tools/example.txt’ overwrite into table t2; 删除已经存在的所有文件,然后写入新的文件。
load data local inpath ‘/tools/example.txt’ into table t2; 在目录中加入新的文件【差异在overwrite】。
方法2:用hadoop fs -put的文件上传功能。
hadoop fs -put /tools/example.txt /user/t2 文件名不改变。
hadoop fs -put /tools/example.txt /user/t2/1.txt 文件名为1.txt
6.在HIVE命令行中验证

select * from t2;  上传一次文件,执行一次。


7.在hadoop的文件管理器,也可以浏览到新上传的文件。

同一个文件中的记录是会自动作去重处理的。

——————————————-
如果是sequencefile呢?
1.检验sequencefile的内容。
hadoop fs -Dfs.default.name=file:/// -text /tools/mytest.gzip.sf 废弃的
hadoop fs -Dfs.defaultFS=file:/// -text /tools/mytest.txt.sf

实际内容是:

2.建表

  create external table sfgz(
     `idx` string,
     `userid` string,
     `flag` string,
     `count` string,
     `value` string,
     `memo` string)
  partitioned by (dt string)
  row format delimited fields terminated by ','
  stored as sequencefile
  location '/user/sfgz';

3.上传文件

方法一:
hadoop fs -mkdir -p /user/sfgz/dt=2010-05-06/
hadoop fs -put /tools/mytest.txt.sf /user/sfgz/dt=2019-05-17
hadoop fs -put /tools/mytest.txt.sf /user/sfgz/dt=2010-05-04
这种方法,还需要人为Reload一下才行,其reload指令是:
方法二:
load data local inpath '/tools/mytest.txt.sf' into table sfgz partition(dt='2009-03-01');这种方法是可以直接查询了。
load data local inpath '/tools/mytest.gzip.sf' into table sfgz partition(dt='2000-03-02');

ES自动清理过期索引

转载别人的:https://www.jianshu.com/p/f4af986ca164
注:基于别人脚本上,进行针对修改。所以若直接使用清理脚本,请认真看清你的脚本是否是以下形式。如果不是,则脚本需要做适应性调整。
1.编写清理脚本

仅支持这种形式的索引清理
green  open   k8s-stdout-2018.11.01                     PHEIz5NXSw-ljRvOP1cj9Q   5   1    3748246            0      3.2gb          1.6gb
green  open   recom-nginxacclog-2018.10.27              jl5ZBPHsQBifN0-pXEp_yw   5   1   52743481            0     20.8gb         10.4gb
green  open   nginx-json-acclog-2018.11.09              r6jsGcHWRV6RP2mY6zFxDA   5   1      95681            0     50.4mb         25.1mb
green  open   watcher_alarms-2018.11.09                 r_GS_GQGRoCgyOgiCFCuQw   5   1         24            0    323.4kb        161.7kb
green  open   .monitoring-es-6-2018.11.12               o8H2S-iERnuIAWQT7wuBRA   1   1       4842            0      3.2mb          1.6mb
green  open   client-nginxacclog-2018.11.02             FxXdLPpiSnuBtrJOB2BRsw   5   1  179046160            0    220.8gb        110.3gb
green  open   k8s-stderr-2018.11.16                     Vggw7iYCQ6OHtW-sphgGrA   5   1      68546            0     20.4mb         10.2mb
green  open   k8s-stderr-2018.10.21                     ZCeZZFRWSVyyYDKMjzPnbw   5   1      15454            0      5.3mb          2.6mb
green  open   watcher_alarms-2018.10.30                 VqrbMbnuQ3ChPysgnwgm2w   5   1         44            0      371kb        185.5kb
#!/bin/bash
######################################################
# $Name:        clean_es_index.sh
# $Version:     v1.0
# $Function:    clean es log index
# $Author:      sjt
# $Create Date: 2018-05-14
# $Description: shell
######################################################
#本文未加文件锁,需要的可以加
#脚本的日志文件路径
CLEAN_LOG="/root/clean_es_index.log"
#索引前缀
INDEX_PRFIX=$1
DELTIME=$2
if [ "$DELTIME"x = ""x ]; then
   DELTIME=30
fi

echo "ready to delete index $DELTIME ago!!!"

#elasticsearch 的主机ip及端口
SERVER_PORT=10.2.1.2:9200
#取出已有的索引信息
if [ "$INDEX_PRFIX"x = ""x ]; then
     echo  "curl -s \"${SERVER_PORT}/_cat/indices?v\""
     INDEXS=$(curl -s "${SERVER_PORT}/_cat/indices?v" |awk '{print $3}')
else
     echo " curl -s \"${SERVER_PORT}/_cat/indices?v\""
     INDEXS=$(curl -s "${SERVER_PORT}/_cat/indices?v" |grep "${INDEX_PRFIX}"|awk '{print $3}')
fi
#删除多少天以前的日志,假设输入10,意味着10天前的日志都将会被删除
# seconds since 1970-01-01 00:00:00 seconds
SECONDS=$(date -d  "$(date  +%F) -${DELTIME} days" +%s)
#判断日志文件是否存在,不存在需要创建。
if [ ! -f  "${CLEAN_LOG}" ]
then
touch "${CLEAN_LOG}"
fi
#删除指定日期索引
echo "----------------------------clean time is $(date +%Y-%m-%d_%H:%M:%S) ------------------------------" >>${CLEAN_LOG}
for del_index in ${INDEXS}
do
    indexDate=$( echo ${del_index} | awk -F '-' '{print $NF}' )
    # echo "${del_index}"
    format_date=$(echo ${indexDate}| sed 's/\.//g')
    #根据索引的名称的长度进行切割,不同长度的索引在这里需要进行对应的修改
    indexSecond=$( date -d ${format_date} +%s )
    #echo "$SECONDS - $indexSecond = $(( $SECONDS - $indexSecond ))"
    if [ "$SECONDS" -gt "$indexSecond" ]; then
        echo "it will delete ${del_index}......."
        echo "${del_index}" >> ${CLEAN_LOG}
        #取出删除索引的返回结果
        delResult=`curl -s  -XDELETE "${SERVER_PORT}/"${del_index}"?pretty" |sed -n '2p'`
        #写入日志
        echo "clean time is $(date)" >>${CLEAN_LOG}
        echo "delResult is ${delResult}" >>${CLEAN_LOG}
    fi
done

2.加入crontab任务

# 进入crontab编辑模式
crontab -e 
# 在crontab编辑模式中输入
10 1 * * * sh /app/elk/es/es-index-clear.sh > /dev/null 2>&1

logstash的ruby使用

在logstash.conf配置中,可以使用ruby动态修改某个字段数据。

filter {
    if [type] == "deployment" {
        drop {}
    }

    mutate {
        remove_field => ["kafka"]
    }

    ruby {
        code => "
        timestamp = event.get('@timestamp') #从字段中获取@timestamp字段,
        localtime = timestamp.time + 28800 #加上8个小时偏差
        localtimeStr = localtime.strftime('%Y.%m.%d')
        event.set('localtime', localtimeStr) #保存最新时间
        "
    }
}
filter {
    grok {
        #match => {"message"=>'(?\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}) - (?(\s+)|-) (?.*) "(?.*?) (?.*?)\?d=(?.*?) (?\S+)" (?\d+) (?\d+) "(?.*?)" "(?.*?)" "(?.*?)'}
        #match => {"message"=>'(?\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}) - (?(\s+)|-) (?.*) "(?.*?) (?.*?)\?d=(?.*?) (?\S+)" (?\d+) (?\d+) "(?.*?)" "(?.*?)".*?'}
        match => {"message"=>'(?\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}) - (?(\s+)|-) \[(?.*)\] "(?.*?) (?.*?)\?(d=)?(?.*?) (?\S+)" (?\d+) (?\d+) "(?.*?)" "(?.*?)".*?'}
    }

    if [tags]{
        drop {}
    }

    if [status] != "200" {
        drop {}
    }

    ruby {
        init => "require 'base64'"
        code => "
        string = event.get('string')
        if string
            begin
                b64 = Base64.decode64(string).force_encoding('utf-8')
                #puts b64, event.get('message')
                event.set('b64_decode', b64)
            rescue ArgumentError
                event.set('b64_decode', '')
            end
        else
            event.set('b64_decode', '')
        end
        "
    }

    if [b64_decode == ""]{
        drop {}
    }

    kv {
        source => "b64_decode"
        field_split => "&?"
        value_split => "="
    }

    if [type] == "template" {
        mutate {
            remove_field => ["@timestamp", "@version", "b64_decode", "message", "string", "body_bytes_sent", "timelocal", "http_user_agent", "http_referer", "status", "protocol", "uri", "ver", "remote_user", "remote_addr", "host", "method", "path"]
        }
    } else {
        date {
            match => ["timelocal", "dd/MMM/yyyy:HH:mm:ss Z"]
            target => "@timestamp"
        }

        mutate {
            remove_field => ["@version", "b64_decode", "message", "string", "body_bytes_sent", "timelocal", "http_user_agent", "http_referer", "status", "protocol", "uri", "ver", "remote_user", "remote_addr", "host", "method", "path"]
        }
    }

}

【filebeat】【kafka】【es】的通用配置

1.配置nginx的日志格式

log_format appstore escape=json '{"@timestamp":"$time_iso8601",'
                        '"@source":"$server_addr",'
                        '"hostname":"$hostname",'
                        '"xforward":"$http_x_forwarded_for",'
                        '"remoteaddr":"$remote_addr",'
                        '"method":"$request_method",'
                        '"scheme":"$scheme",'
                        '"domain":"$server_name",'
                        '"referer":"$http_referer",'
                        '"url":"$request_uri",'
                        '"args":"$args",'
                        '"requestbody":"$request_body",'  #根据实际情况开放,否则会导致内容太大。
                        '"bodybytessend":$body_bytes_sent,'
                        '"status":$status,'
                        '"requesttime":$request_time,'
                        '"upstreamtime":"$upstream_response_time",'  #需要加引号,某些upstream放弃处理时,其时间会为空的。
                        '"upstreamaddr":"$upstream_addr",'
                        '"useragent":"$http_user_agent"'
                        '}';

当输出的不是JSON格式时,因为logstash的透传关系,仍会把内容保存在ES中,并增加了[tag]failure的标签,如下所示:

2.使用方式

location /api/app/ {
        access_log /data/log/nginx/appstore.app.abc.cn.access.log abc buffer=32k flush=5s;
        error_log /data/log/nginx/appstore.app.abc.cn.error.log;
        proxy_pass http://appstore.app.abc.cn:8001/;
    }

3.配置filebeat的yml配置
vim /etc/filebeat/filebeat.yml

filebeat.prospectors:
- input_type: log
  paths:
    - /data/log/nginx/appstore.app.wps.cn.access.log
  document_type: simplejson
  fields_under_root: true
  fields:
    es_index_type: nginx-access
    es_index_prefix: appstore-app-wps-cn
  tail_files: true

#================================ Processors ===================================
processors:
- drop_event:
    when:
       contains:
           message: "HEAD /lb_health.php"


#----------------------------- output ----------------------------------
output.kafka:
  hosts: ["10.0.0.33:9092","10.0.0.59:9092","10.0.0.67:9092"]
  topic: "simple_json_for_all"
  partition.round_robin:
    reachable_only: false

  required_acks: 0
  compression: none
  max_message_bytes: 1000000


#----------------------------- file output for debug ----------------------------------
output.file:
  # true: will output to  file, false: disable output to file.
  enabled: false
  path: "/data/log/filebeat"
  filename: debug.log

#================================ Logging =====================================

logging.level: info

3.logstash的logstash.conf的nginx-access的配置。

input {
    kafka {
        bootstrap_servers => "10.0.0.33:9092,10.0.0.59:9092,10.0.0.67:9092"
        topics => ["simple_json_for_all"]
        group_id => "simple_json_consumer"
        consumer_threads => 5
        codec => 'json'
        decorate_events => true
    }
}

filter {
   if [type] == "simplejson" {
       json {
           source => "message"
           remove_field => ["message"]
           remove_field => ["kafka"]
           remove_field => ["beat"]
       }
   }
}

output {
    #stdout{codec=>rubydebug}
    if [type] == "simplejson" {
        if [es_index_prefix] and [es_index_type] {
            elasticsearch {
                hosts => ["10.2.1.2:9200", "10.2.1.11:9200", "10.2.1.15:9200"]
                # es_index_prefix & es_index_type were defined in filebeat.yml
                index => "%{es_index_prefix}-%{es_index_type}-%{+YYYY.MM.dd}"
                manage_template => true
            }
        } else {
            elasticsearch {
                hosts => ["10.2.1.2:9200", "10.2.1.11:9200", "10.2.1.15:9200"]
                index => "default-simplejson-%{+YYYY.MM.dd}"
                manage_template => true
            }
        }
    }
}

5.检验索引是否存在
curl -XGET ‘http://10.2.1.2:9200/_cat/indices?v’|grep nginx-access

elasticsearch数据持久化

elasticSearch数据持久化,默认情况不启用数据保存,故数据一般会几分钟就消失,按以下步骤保存索引数据。

#不同的集群名字不能相同。
cluster.name: es_vm_test
node.name: vmmaster
network.host: 0.0.0.0
http.port: 9200
#数据索引保存
path.data: /home/abc/elk-5.5.1/elkdata/data
path.logs: /home/abc/elk-5.5.1/elkdata/log
#关闭登录验证
xpack.security.enabled: false

logstash抓取nginx日志

以下是基于elk+lnmp开源进行测试验证。
也可以参考官网的实现方法:https://kibana.logstash.es/content/logstash/plugins/codec/json.html
https://kibana.logstash.es/content/logstash/plugins/codec/multiline.html
在官网文档中,有较多应用场景:
https://kibana.logstash.es/content/
https://kibana.logstash.es/content/logstash/examples/

1.抓取nginx日志

input {
    file {
        # path => ["/home/wwwlogs/h5.vim.vim.com.log", "/home/wwwlogs/h5.vim.vim.com2.log"]
	path => "/home/wwwlogs/h5.vim.vim.com.log"
        exclude => "*.zip"
        type => "java"
        add_field => [ "domain", "h5.vim.vim.com" ]
        codec => multiline {
                      pattern => "^\s+"
                      what => previous
              }
    }
    file {
        # path => ["/home/wwwlogs/h5.api.vim.vim.com.log", "/home/wwwlogs/h5.api.vim.vim.com2.log"]
	path => "/home/wwwlogs/h5.api.vim.vim.com.log"
        exclude => ["*.zip", "*.gz"]
        type => "java"
        add_field => [ "domain", "h5.api.vim.vim.com" ]
        codec => multiline {
                        pattern => "^\s+"
                        what => previous
                 }
    }
}
filter {

}
output {
    stdout { 
		codec => rubydebug 
	}
    elasticsearch {
        hosts => ["0.0.0.0:9200"]
        index => "logstash-%{domain}-%{+YYYY.MM.dd}"
    }
}

2.定期清理索引

#!/bin/bash

# --------------------------------------------------------------
# This script is to delete ES indices older than specified days.
# Version: 1.0
# --------------------------------------------------------------

function usage() {
        echo "Usage: `basename $0` -s ES_SERVER -d KEEP_DAYS [-w INTERVAL]"
}


PREFIX='logstash-'
WAITTIME=2
NOW=`date  +%s.%3N`
LOGPATH=/apps/logs/elasticsearch


while getopts d:s:w: opt
do
        case $opt in
        s) SERVER="$OPTARG";;
        d) KEEPDAYS="$OPTARG";;
        w) WAITTIME="$OPTARG";;
        *) usage;;
        esac
done

if [ -z "$SERVER" -o -z "$KEEPDAYS" ]; then
        usage
fi

if [ ! -d $LOGPATH ]; then
        mkdir -p $LOGPATH
fi


INDICES=`curl -s $SERVER/_cat/indices?h=index | grep -P '^logstash-.*\d{4}.\d{2}.\d{2}' | sort`
for index in $INDICES
do
        date=`echo $index | awk -F '-' '{print $NF}' | sed 's/\./-/g' | xargs -I{} date -d {} +%s.%3N`
        delta=`echo "($NOW-$date)/86400" | bc`
        if [ $delta -gt $KEEPDAYS ]; then
                echo "deleting $index" | tee -a $LOGPATH/es_delete_indices.log
                curl -s -XDELETE $SERVER/$index | tee -a $LOGPATH/es_delete_indices.log
                echo | tee -a $LOGPATH/es_delete_indices.log
                sleep $WAITTIME
        fi
done

机器学习的一些库

Gensim是一个相当专业的计算相似度的Python工具包。
在文本处理中,比如商品评论挖掘,有时需要了解每个评论分别和商品的描述之间的相似度,以此衡量评论的客观性。
评论和商品描述的相似度越高,说明评论的用语比较官方,不带太多感情色彩,比较注重描述商品的属性和特性,角度更客观。
http://radimrehurek.com/gensim/

————————————-
图像识别类库
https://github.com/tesseract-ocr/tesseract

原本由惠普开发的图像识别类库tesseract-ocr已经更新到2.04, 就是最近Google支持的那个OCR。原先是惠普写的,现在Open source了。