on
[트러블슈팅 Logstash] Input Kafka plugin, IllegalStateException: No...
[트러블슈팅 Logstash] Input Kafka plugin, IllegalStateException: No...
반응형
개요
다음 메트릭 데이터 수집 파이프라인 구성 중에 발생한 문제이다.
CollectD(x3) -> logstash -> kafka(x3)
운영 환경은 다음과 같다.
OS: Amazon Linux2 (x7, collectd x3, logstash, kafka x3)
instance type: m5.large
Kafka의 모든 노드는 kafka, Zookeeper가 설치되어 있으며 클러스터로 구성되어 있다.
문제 상황
Kafka 에 제대로 된 데이터 수집이 안되고 있어서 Logstash 의 로그를 확인해 보았다. ( /var/log/logstash/input.log 에서 CollectD 에 데이터가 수집되는 것을 확인하였다.)
$ tail -f /var/log/logstash/logstash-plain.log ... [2021-09-30T08:50:34,929][ERROR][org.apache.kafka.clients.producer.internals.Sender] [Producer clientId=producer-1] Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for connection 2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:330) ~[kafka-clients-2.1.0.jar:?] at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:134) ~[kafka-clients-2.1.0.jar:?] at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:921) ~[kafka-clients-2.1.0.jar:?] at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:287) ~[kafka-clients-2.1.0.jar:?] at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:335) ~[kafka-clients-2.1.0.jar:?] at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:308) ~[kafka-clients-2.1.0.jar:?] at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:233) [kafka-clients-2.1.0.jar:?] at java.lang.Thread.run(Thread.java:829) [?:?] ...
현재 문제가 되는 Logstash 의 구성 파일은 다음과 같다.
/etc/logstash/conf.d/collectd-to-kafka.conf
input { udp { port => 25826 buffer_size => 1452 codec => collectd { } } } output { file { path => "/var/log/logstash/input.log" codec => rubydebug } kafka { bootstrap_servers => "10.0.0.10:9092,10.0.0.11:9092,10.0.0.12:9092" codec => json topic_id => "system_metric" } }
문제 해결
다행히, 같은 문제에 대해서 원인을 찾고 해결한 문서를 찾을 수 있었다. 결론만 내리자면, Logstash 가 사용하는 카프카 클라이언트 라이브러리는 Kafka 의 노드들을 찾을 때, 구성했던 노드 이름으로 찾게 된다. 내가 구성했던 카프카 클러스터의 정보는 다음과 같다.
/home/ec2-user/apps/kafka/config/server.properties
broker.id=0 listeners=PLAINTEXT://:9092 advertised.listeners=PLAINTEXT://node1:9092 zookeeper.connect=node1:2181,node2:2181,node3/test log.dirs=/tmp/kafka-logs-0
이 때, node* 은 /etc/hosts 에 private ip와 매핑되게끔 설정해두었다. 따라서 Logstash 의 호스트에서도 같은 작업을 진행하면 된다. /etc/hosts 에 private ip와 Kafka 노드의 이름을 알맞게 매핑시켜준다.
/etc/hosts
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4 ::1 localhost6 localhost6.localdomain6 10.0.0.11 node01 10.0.0.12 node02 10.0.0.13 node03
그 후 구성 파일을 다음과 같이 수정한다.
/etc/logstash/conf.d/collectd-to-kafka.conf
input { udp { port => 25826 buffer_size => 1452 codec => collectd { } } } output { # 이 설정은 디버깅 용이다. 나중에 삭제하는 것이 좋다. file { path => "/var/log/logstash/input.log" codec => rubydebug } kafka { bootstrap_servers => "node1:9092,node2:9092,node3:9092" # kafka 클러스터의 모든 노드 codec => json topic_id => "system_metric" } }
그 후 Logstash 를 재시작한다.
$ sudo systemctl restart logstash
이제 다시 로그를 확인해보면 에러 문구 없이 정상적으로 CollectD 에서 수집한 데이터를 Kafka 로 넘기는 것을 확인할 수 있다.
$ tail -f /var/log/logstash/logstash-plain.log .... [2021-09-30T08:54:52,674][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka version : 2.1.0 [2021-09-30T08:54:52,675][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka commitId : eec43959745f444f [2021-09-30T08:54:52,728][INFO ][logstash.pipeline ] Pipeline started successfully {:pipeline_id=>"main", :thread=>"#"} [2021-09-30T08:54:52,814][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]} [2021-09-30T08:54:52,950][INFO ][logstash.inputs.udp ] Starting UDP listener {:address=>"0.0.0.0:25826"} [2021-09-30T08:54:53,045][INFO ][logstash.inputs.udp ] UDP listener started {:address=>"0.0.0.0:25826", :receive_buffer_bytes=>"106496", :queue_size=>"2000"} [2021-09-30T08:54:53,161][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600} [2021-09-30T08:54:54,743][INFO ][logstash.outputs.file ] Opening file {:path=>"/var/log/logstash/input.log"} [2021-09-30T08:54:55,096][INFO ][org.apache.kafka.clients.Metadata] Cluster ID: CSLOEmwKT72WIXFuiNNGBg
참고
from http://gurumee92.tistory.com/294 by ccl(A) rewrite - 2021-09-30 19:00:56