Real-Time Data Processing from Apache Kafka to GridDB

I’m working on a real-time data processing pipeline where I consume messages from Apache Kafka and store them in GridDB for further analysis. The data consists of IoT sensor readings (e.g., temperature, humidity, pressure) sent to Kafka topics in JSON format. My goal is to consume these messages in real-time and insert them into GridDB efficiently, ensuring data integrity and minimizing latency.

I’ve set up a basic consumer in Python using the confluent_kafka library to consume messages from Kafka and then insert them into GridDB. However, I’m encountering performance bottlenecks, and sometimes data is not inserted into GridDB as expected. Below is the code snippet for the Kafka consumer and GridDB insertion logic:

from confluent_kafka import Consumer, KafkaError
import griddb_python as griddb
import json

# Kafka Consumer setup
conf = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'iot-sensor-group',
    'auto.offset.reset': 'earliest'
}
kafka_consumer = Consumer(**conf)
kafka_consumer.subscribe(['iot_sensor_data'])

# GridDB setup
factory = griddb.StoreFactory.get_instance()
gridstore = factory.get_store(
    host='localhost',
    port=10001,
    cluster_name='defaultCluster',
    username='<username>',
    password='<password>'
)

container_name = "sensor_data"
cont = gridstore.put_container(container_name,
                               [["sensor_id", griddb.Type.STRING],
                                ["timestamp", griddb.Type.TIMESTAMP],
                                ["data", griddb.Type.STRING]],
                               True)

# Consume messages from Kafka and insert into GridDB
try:
    while True:
        msg = kafka_consumer.poll(1.0)

        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                continue
            else:
                print(msg.error())
                break

        message = json.loads(msg.value().decode('utf-8'))
        sensor_id = message['sensor_id']
        timestamp = message['timestamp']
        data = json.dumps(message['data'])

        cont.put([sensor_id, timestamp, data])

except Exception as e:
    print(f"Error: {e}")
finally:
    # Clean up
    kafka_consumer.close()

This script is supposed to read messages from a Kafka topic and insert them into a GridDB container.

How can I modify the current data save behavior to make it more efficient and optimized?