[NodeJS] ElasticSearch bulk

[NodeJS] ElasticSearch bulk

728x90

elasticsearch 7.x 기준

try { let bulk = []; for(let item of files_info) { const fileName = item.originalname; const fileSize = item.size; const file_mk_dt = moment().format('YYYY-MM-DD HH:mm:ss'); const fileContent = item.buffer.toString('base64'); const data = { file_name : fileName, file_size : fileSize, file_content : fileContent, file_mk_dt : file_mk_dt } bulk.push({"index": {"_index": index_name, "_type": "_doc"}}); bulk.push(data); } let rs; if (bulk.length > 0) { rs = await es_client.bulk({ body: bulk, refresh: 'wait_for' }) Log.info(rs); } rt.error = false; rt.result = rs; } catch (err) { Log.error(err); rt.error = true; rt.msg = 'err'; rt.result = err.message; }

for문을 돌려서 배열에 인덱스 정보, 데이터를 푸시해준 뒤 bulk 해주면 됨.

상황에 따라 특정 쿼리로 스크롤 검색을 이용해 while문을 사용해서 데이터를 모두 뽑고 bulk로 데이터를 update 할 수도 있음.

let rs = await ES_CLIENT.search({ index: idx_threat + data._datetime.substring(0, 4), type: 'doc', scroll: '3m', body: query }) while(rs.hits.hits.length > 0) { rs.hits.hits.map( (doc) => { bulk.push({update:{_index: idx_threat + doc._source._datetime.substring(0,4), _type: 'doc', _id: doc._id}}); bulk.push({doc: {TW_STATE: '3'}}); }) rs = await ES_CLIENT.scroll({ scrollId: rs._scroll_id, scroll: '3m' }); }

특정 인덱스에서 쿼리에 해당하는 데이터의 개수가 많아 스크롤 검색을 사용했고, while문으로 데이터를 끝까지 뽑으면서 데이터를 수정하는 bulk를 만드는 로직.

반응형

from http://minu0807.tistory.com/113 by ccl(A) rewrite - 2021-11-23 09:01:09