datax elasticsearchwriter 使用

  • 2022-07-14
  • 浏览 (2130)

DataX ElasticSearchWriter

1 快速介绍

数据导入elasticsearch的插件

2 实现原理

使用elasticsearch的rest api接口, 批量把从reader读入的数据写入elasticsearch

3 功能说明

3.1 配置样例

job.json

{
  "job": {
    "setting": {
        "speed": {
            "channel": 1
        }
    },
    "content": [
      {
        "reader": {
          ...
        },
        "writer": {
          "name": "elasticsearchwriter",
          "parameter": {
            "endpoint": "http://xxx:9999",
            "accessId": "xxxx",
            "accessKey": "xxxx",
            "index": "test-1",
            "type": "default",
            "cleanup": true,
            "settings": {"index" :{"number_of_shards": 1, "number_of_replicas": 0}},
            "discovery": false,
            "batchSize": 1000,
            "splitter": ",",
            "column": [
              {"name": "pk", "type": "id"},
              { "name": "col_ip","type": "ip" },
              { "name": "col_double","type": "double" },
              { "name": "col_long","type": "long" },
              { "name": "col_integer","type": "integer" },
              { "name": "col_keyword", "type": "keyword" },
              { "name": "col_text", "type": "text", "analyzer": "ik_max_word"},
              { "name": "col_geo_point", "type": "geo_point" },
              { "name": "col_date", "type": "date", "format": "yyyy-MM-dd HH:mm:ss"},
              { "name": "col_nested1", "type": "nested" },
              { "name": "col_nested2", "type": "nested" },
              { "name": "col_object1", "type": "object" },
              { "name": "col_object2", "type": "object" },
              { "name": "col_integer_array", "type":"integer", "array":true},
              { "name": "col_geo_shape", "type":"geo_shape", "tree": "quadtree", "precision": "10m"}
            ]
          }
        }
      }
    ]
  }
}

3.2 参数说明

  • endpoint

    • 描述:ElasticSearch的连接地址
    • 必选:是
    • 默认值:无
  • accessId

    • 描述:http auth中的user
    • 必选:否
    • 默认值:空
  • accessKey

    • 描述:http auth中的password
    • 必选:否
    • 默认值:空
  • index

    • 描述:elasticsearch中的index名
    • 必选:是
    • 默认值:无
  • type

    • 描述:elasticsearch中index的type名
    • 必选:否
    • 默认值:index名
  • cleanup

    • 描述:是否删除原表
    • 必选:否
    • 默认值:false
  • batchSize

    • 描述:每次批量数据的条数
    • 必选:否
    • 默认值:1000
  • trySize

    • 描述:失败后重试的次数
    • 必选:否
    • 默认值:30
  • timeout

    • 描述:客户端超时时间
    • 必选:否
    • 默认值:600000
  • discovery

    • 描述:启用节点发现将(轮询)并定期更新客户机中的服务器列表。
    • 必选:否
    • 默认值:false
  • compression

    • 描述:http请求,开启压缩
    • 必选:否
    • 默认值:true
  • multiThread

    • 描述:http请求,是否有多线程
    • 必选:否
    • 默认值:true
  • ignoreWriteError

    • 描述:忽略写入错误,不重试,继续写入
    • 必选:否
    • 默认值:false
  • ignoreParseError

    • 描述:忽略解析数据格式错误,继续写入
    • 必选:否
    • 默认值:true
  • alias

    • 描述:数据导入完成后写入别名
    • 必选:否
    • 默认值:无
  • aliasMode

    • 描述:数据导入完成后增加别名的模式,append(增加模式), exclusive(只留这一个)
    • 必选:否
    • 默认值:append
  • settings

    • 描述:创建index时候的settings, 与elasticsearch官方相同
    • 必选:否
    • 默认值:无
  • splitter

    • 描述:如果插入数据是array,就使用指定分隔符
    • 必选:否
    • 默认值:-,-
  • column

    • 描述:elasticsearch所支持的字段类型,样例中包含了全部
    • 必选:是
  • dynamic

    • 描述: 不使用datax的mappings,使用es自己的自动mappings
    • 必选: 否
    • 默认值: false

4 性能报告

4.1 环境准备

  • 总数据量 1kw条数据, 每条0.1kb
  • 1个shard, 0个replica
  • 不加id,这样默认是append_only模式,不检查版本,插入速度会有20%左右的提升

4.1.1 输入数据类型(streamreader)

{"value": "1.1.1.1", "type": "string"},
{"value": 19890604.0, "type": "double"},
{"value": 19890604, "type": "long"},
{"value": 19890604, "type": "long"},
{"value": "hello world", "type": "string"},
{"value": "hello world", "type": "string"},
{"value": "41.12,-71.34", "type": "string"},
{"value": "2017-05-25", "type": "string"},

4.1.2 输出数据类型(eswriter)

{ "name": "col_ip","type": "ip" },
{ "name": "col_double","type": "double" },
{ "name": "col_long","type": "long" },
{ "name": "col_integer","type": "integer" },
{ "name": "col_keyword", "type": "keyword" },
{ "name": "col_text", "type": "text"},
{ "name": "col_geo_point", "type": "geo_point" },
{ "name": "col_date", "type": "date"}

4.1.2 机器参数

  1. cpu: 32 Intel® Xeon® CPU E5-2650 v2 @ 2.60GHz
  2. mem: 128G
  3. net: 千兆双网卡

4.1.3 DataX jvm 参数

-Xms1024m -Xmx1024m -XX:+HeapDumpOnOutOfMemoryError

4.2 测试报告

通道数 批量提交行数 DataX速度(Rec/s) DataX流量(MB/s)
4 256 11013 0.828
4 1024 19417 1.43
4 4096 23923 1.76
4 8172 24449 1.80
8 256 21459 1.58
8 1024 37037 2.72
8 4096 45454 3.34
8 8172 45871 3.37
16 1024 67567 4.96
16 4096 78125 5.74
16 8172 77519 5.69
32 1024 94339 6.93
32 4096 96153 7.06
64 1024 91743 6.74

4.3 测试总结

  • 最好的结果是32通道,每次传4096,如果单条数据很大, 请适当减少批量数,防止oom
  • 当然这个很容易水平扩展,而且es也是分布式的,多设置几个shard也可以水平扩展

5 约束限制

  • 如果导入id,这样数据导入失败也会重试,重新导入也仅仅是覆盖,保证数据一致性
  • 如果不导入id,就是append_only模式,elasticsearch自动生成id,速度会提升20%左右,但数据无法修复,适合日志型数据(对数据精度要求不高的)

相关文章

datax README 使用

datax adbpgwriter 使用

datax adswriter 使用

datax cassandrareader 使用

datax cassandrawriter 使用

datax dataxPluginDev 使用

datax drdsreader 使用

datax drdswriter 使用

datax ftpreader 使用

datax ftpwriter 使用

1  赞