paho라이브러리는 왜 콜백함수를 사용할까

최근 mqtt와 telegraf, influxdb를 활용하는 팩토리 모니터링 인프라를 학습중이다. 우선 python의 paho 라이브러리를 이용하여 publisher를 작성중이었는데, 다음과 같이 client 객체를 할당할 때 콜백함수를 쓰는 것을 발견했다.

client_legacy = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1, "sensor_simulator_legacy")

C언어 위주의 학습을 해왔다 보니 비동기 처리를 위한 콜백과 async등에 대해서 잘 아는것은 아니었지만, 콜백이 중첩되고 호출 횟수가 늘어날수록 오버헤드가 증가하고, 복잡도가 증가한다는 것은 명백했다. 그런데 대규모의 mqtt publisher를 상정해야 하는 구조에서 왜 client 객체를 할당하는데 콜백함수를 쓰게 만들었을까? 이번 포스트는 이 의문에서 시작한다.

1. 왜 비동기인가?

사실 이 문제에 대한 대답은 자명하다. mqtt는 그 이름에서 알 수 있듯이 메시지를 queuing 하는 프로토콜이다. 대규모 시스템을 상정해보자. 이상 가능성이 있는 장치는 많고, 이 메시지를 담을 subscriber는 한정적이다. 이 subscriber는 수많은 장치에서 오는 메시지를 거의 동시에 처리해야 한다. 동기적으로 작동하는 코드라면, 어떤 publisher가 응답을 받기 전까지 모든 동작을 멈추고 대기하게 된다. publisher의 처리 방식에 따라 지난 신호와 메시지가 유실될 수 있는 것이다. 당연히 실시간 감시에 치명적이다.

2. 그렇다면 왜 콜백인가?

mqtt가 비동기 처리여야 함은 명백하다. 그렇다면 왜 현대적인 async같은 방법들을 쓰지 않았을까? 첫 번째 이유는 간단하다. paho, mqtt가 오래되었기 때문이다. mqtt의 초기버전은 1999년에 만들어졌고, paho의 초기버전은 2014년에 만들어졌다. 이 당시는 여전히 콜백함수가 대세처럼 사용 되던 시기다. paho도 당연히 자연스럽게 이 흐름을 따랐을 것이다.

두 번째 이유는, mqtt가 이벤트 기반 프로토콜이기 때문이다. subscriber는 어떤 이벤트가 발생하기를 기다리다가, 이벤트가 발생하면 동작한다. 콜백 기반 함수 역시 어떤 이벤트가 발생했을 때 호출되는 함수이다. 직관적으로 가장 잘 매칭되는 함수 형식이라고 볼 수 있다.

세 번째 이유는, 콜백함수가 async 방식에 비해 오버헤드가 적기 때문이다.

Benchmark
# python

import time
import asyncio
import paho.mqtt.client as mqtt
import logging
import warnings

logging.getLogger().setLevel(logging.ERROR)
warnings.filterwarnings("ignore")

BROKER = "test.mosquitto.org"
NUM_MESSAGES = 1000

def test_callback():
published = 0
done = False

def on_publish(client, userdata, mid, reason_code, properties):
    nonlocal published, done
    published += 1
    if published >= NUM_MESSAGES:
        done = True

client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
client.on_publish = on_publish
client.connect(BROKER)

start = time.time()

for i in range(NUM_MESSAGES):
    client.publish(f"test/{i}", "data", qos=0)

while not done:
    client.loop()

elapsed = time.time() - start
client.disconnect()

print(f"콜백: {elapsed:.3f}초 ({published}개 발행)")
return elapsed

async def test_async():
import aiomqtt

logging.getLogger("aiomqtt").setLevel(logging.ERROR)
logging.getLogger("mqtt").setLevel(logging.ERROR)

start = time.time()
published = 0

try:
    async with aiomqtt.Client(
        BROKER, 
        logger=None,
        timeout=10  # 10초 타임아웃
    ) as client:
        # QoS 0으로 빠르게 발행
        for i in range(NUM_MESSAGES):
            await client.publish(f"test/{i}", "data", qos=0)
            published += 1

            # 100개마다 잠시 양보
            if i % 100 == 0:
                await asyncio.sleep(0.001)

except asyncio.TimeoutError:
    print(f"Async 타임아웃 ({published}개 발행)")
    return None

elapsed = time.time() - start
print(f"Async: {elapsed:.3f}초 ({published}개 발행)")
return elapsed

async def main():
print("=" * 50)
print(f"테스트: {NUM_MESSAGES}개 메시지 (QoS 0)")
print("=" * 50 + "\n")
callback_time = test_callback()
print()
async_time = await test_async()
print()
print("\n" + "=" * 50)
print("📊 결과:")
print(f"콜백: {callback_time:.3f}초")
if async_time:
    print(f"Async: {async_time:.3f}초")
    diff = ((async_time / callback_time) - 1) * 100
    print(f"→ 콜백이 {diff:.1f}% 더 빠름")
print(f"Async 배치: {batch_time:.3f}초")

if name == "main":
asyncio.run(main())

단순하게 대량의 메시지를 처리하는 콜백함수와 async함수를 비교한 결과, 콜백함수가 더 빠른 것을 확인 할 수 있다.

3. 진실

위에서 왜 paho 라이브러리에서 콜백함수를 사용했는지 살펴보았다. 특히 3번에서 콜백함수와 async함수를 비교해본 것은 재미있는 실험이었다. 그러나, 이는 사후적 추론에 가깝다. 최근에는 비동기 런타임과 스케줄러도 발전하고 있고, 여러가지 방식을 응용해서 async가 콜백보다 더 빠르게 작동할 수 있기 때문이다. paho 라이브러리도 이런 점을 수용하여, async 방식을 적용하고 있는 것으로 알고 있다. 그렇다면, 진짜 이유는 자연스럽게도 첫 번째 이유가 진실일 확률이 높지 않을까? 게다가, 오래된 코드라는 것은 그동안 끈질기게 살아남았다는 것이고, 이는 곧 코드의 신뢰성을 방증한다. 컴퓨터 세계에는 잘 돌아가는 것을 굳이 건드리지 말라는 격언이 있다. 그리고, 현장에는 이 콜백함수를 사용하는 무수히 많은 장치가 있다. 아주 잘 돌아가는 장치가 말이다.

얕은 지식에서 출발한 DEEPDIVE 였지만 많이 다뤄보지 않은 콜백함수와 async함수와 더 친해질 수 있는 좋은 기회였다. async함수를 보조하기 위한 방법도 여러가지가 있다고 한다. 시간이 난다면 이 방법들도 다뤄보고 싶다.

코멘트

답글 남기기

이메일 주소는 공개되지 않습니다. 필수 필드는 *로 표시됩니다