1 介绍
es数据导出到csv文件,暂时不考虑效率问题,仅谈实现方式。
2 python3
def connect_elk():
client = Elasticsearch(hosts='http://192.168.56.20:9200',
http_auth=("elastic", "elastic密码"),
sniff_on_connection_fail=True,
sniffer_timeout=60
)
return client
from elasticsearch import Elasticsearch
import csv
from common.util_es import connect_elk
es = connect_elk()
'''
查询所有数据并导出
'''
index = 'blog_rate'
body = {}
item = ["r_id", "a_id"]
def ExportCsv(index, body,item):
query = es.search(index=index, body=body, scroll='5m', size=1000)
results = query['hits']['hits']
total = query['hits']['total']["value"]
scroll_id = query['_scroll_id']
for i in range(0, int(total / 100) + 1):
query_scroll = es.scroll(scroll_id=scroll_id, scroll='5m')['hits']['hits']
results += query_scroll
with open('./' + index + '.csv', 'w', newline='', encoding="utf_8_sig") as flow:
csv_writer = csv.writer(flow)
for res in results:
csvrow1 = []
for i in item:
csvrow1.append(res["_source"][i])
csv_writer.writerow(csvrow1)
print('done!')
<h2><a id="1__0"></a>1 介绍</h2>
<p>es数据导出到csv文件,暂时不考虑效率问题,仅谈实现方式。</p>
<h2><a id="2_python3_3"></a>2 python3</h2>
<pre><div class="hljs"><code class="lang-python"><span class="hljs-keyword">def</span> <span class="hljs-title function_">connect_elk</span>():
client = Elasticsearch(hosts=<span class="hljs-string">'http://192.168.56.20:9200'</span>,
http_auth=(<span class="hljs-string">"elastic"</span>, <span class="hljs-string">"elastic密码"</span>),
<span class="hljs-comment"># 在做任何操作之前,先进行嗅探</span>
<span class="hljs-comment"># sniff_on_start=True,</span>
<span class="hljs-comment"># 节点没有响应时,进行刷新,重新连接</span>
sniff_on_connection_fail=<span class="hljs-literal">True</span>,
<span class="hljs-comment"># 每 60 秒刷新一次</span>
sniffer_timeout=<span class="hljs-number">60</span>
)
<span class="hljs-keyword">return</span> client
</code></div></pre>
<pre><div class="hljs"><code class="lang-python"><span class="hljs-keyword">from</span> elasticsearch <span class="hljs-keyword">import</span> Elasticsearch
<span class="hljs-keyword">import</span> csv
<span class="hljs-comment"># 获取es数据库</span>
<span class="hljs-keyword">from</span> common.util_es <span class="hljs-keyword">import</span> connect_elk
es = connect_elk()
<span class="hljs-string">'''
查询所有数据并导出
'''</span>
index = <span class="hljs-string">'blog_rate'</span>
body = {}
item = [<span class="hljs-string">"r_id"</span>, <span class="hljs-string">"a_id"</span>]
<span class="hljs-comment"># body = {</span>
<span class="hljs-comment"># "query": {</span>
<span class="hljs-comment"># "match": {"name": "张三"},</span>
<span class="hljs-comment"># }</span>
<span class="hljs-comment"># }</span>
<span class="hljs-keyword">def</span> <span class="hljs-title function_">ExportCsv</span>(<span class="hljs-params">index, body,item</span>):
query = es.search(index=index, body=body, scroll=<span class="hljs-string">'5m'</span>, size=<span class="hljs-number">1000</span>)
<span class="hljs-comment"># es查询出的结果第一页</span>
results = query[<span class="hljs-string">'hits'</span>][<span class="hljs-string">'hits'</span>]
<span class="hljs-comment"># es查询出的结果总量</span>
total = query[<span class="hljs-string">'hits'</span>][<span class="hljs-string">'total'</span>][<span class="hljs-string">"value"</span>]
<span class="hljs-comment"># 游标用于输出es查询出的所有结果</span>
scroll_id = query[<span class="hljs-string">'_scroll_id'</span>]
<span class="hljs-keyword">for</span> i <span class="hljs-keyword">in</span> <span class="hljs-built_in">range</span>(<span class="hljs-number">0</span>, <span class="hljs-built_in">int</span>(total / <span class="hljs-number">100</span>) + <span class="hljs-number">1</span>):
<span class="hljs-comment"># scroll参数必须指定否则会报错</span>
query_scroll = es.scroll(scroll_id=scroll_id, scroll=<span class="hljs-string">'5m'</span>)[<span class="hljs-string">'hits'</span>][<span class="hljs-string">'hits'</span>]
results += query_scroll
<span class="hljs-keyword">with</span> <span class="hljs-built_in">open</span>(<span class="hljs-string">'./'</span> + index + <span class="hljs-string">'.csv'</span>, <span class="hljs-string">'w'</span>, newline=<span class="hljs-string">''</span>, encoding=<span class="hljs-string">"utf_8_sig"</span>) <span class="hljs-keyword">as</span> flow:
csv_writer = csv.writer(flow)
<span class="hljs-keyword">for</span> res <span class="hljs-keyword">in</span> results:
csvrow1 = []
<span class="hljs-keyword">for</span> i <span class="hljs-keyword">in</span> item:
csvrow1.append(res[<span class="hljs-string">"_source"</span>][i])
csv_writer.writerow(csvrow1)
<span class="hljs-built_in">print</span>(<span class="hljs-string">'done!'</span>)
</code></div></pre>
留言