pyflink consume mensajes kafka-connect-jdbc (con esquema)
Cree un conector para mysql a través de la interfaz restFul de Kafka e inícielo.
{
"name": "mysql_stream_test",
"config": {
"connector.class": "io .confluent.connect.jdbc.JdbcSourceConnector",
"timestamp.column.name":"",
"incrementing.column.name":"ID",
"connection.password": "",
"validate.non. null": true,
"tasks.max": 1,
"batch.max.rows":100,
"table.whitelist":"baseqx.test_demo",
"mode":" incrementando",
"topic.prefix": "mysql_",
"connection.user": ",
"poll.interval.ms": 5000,
"numeric.mapping":" best_fit",
"connection.url":"jdbc:mysql://xxx.xxx.xxx.xxx:3306/baseqx?useUnicode=true&characterEncoding=utf8&. enableMultiQueries=true"
}
2. kafka-connect crea un formato de datos predeterminado en el tema como
{"schema":{"type": " struct", "fields":[{"type":" int32", "optional":false, "field":"ID"},{"type":"string","optional":false," Campo ":"ID"Nombre"},{"Tipo":"int64", "Opcional":false, "nombre":" org.apache.kafka.connect.data.Timestamp", "versión":1, "campo ":"CREATE_TIME"}], "opcional":false, "name":"test_demo"}, "payload":{"ID".1, "NAME":"prestoEtl", "CREATE_TIME":1606902182000 }}
3. Utilice pyflink para consumir información a través de patrones
#! /usr/bin/python3.7
# -*- Codificación: UTF-8 -*-
de pyflink.datastream import StreamExecutionEnvironment, CheckpointingMode
de pyflink.table import StreamTableEnvironment, TableConfig, SqlDialect
s_env = StreamExecutionEnvironment.get_execution_environment()
s_env.set_parallelism(1)
st_env = StreamTableEnvironment.create( s_env, TableConfig())
st_
env.get_ config().set_python_executable("python3")
st_env.use_catalog("default_catalog")
st_env.use_database("default_database" )
# Corregir el esquema como una cadena en DML y usar la función FILA para encapsular la carga útil
ddlKafkaConn = """
crear tabla sourceKafkaConn(
`scheam ` ¿STRING? comentario ' kafkaConn esquema por fila',
`carga útil`? ROW(ID BIGINT,NAME STRING,CREATE_TIME STRING)? Obtener datos con esquema de kafkaConnect'
with(
'connector' = 'kafka',
'topic' = 'mysql_test_demo',
'properties.bootstrap.servers' = '192.168.113.11:9092',
'scan.startup.mode' = 'compensación más temprana',
'formato' = 'json'
)
"""
# 'connector.startup-mode' = 'earliest-offset significa leer el mensaje más antiguo | último-offset significa leer el último mensaje en la cola de mensajes',
st_env.execute_sql( ddlKafkaConn)
fregaderoPrint = '''
CREAR TABLA fregaderoPrint CON ('conector' = ' print')
COMO sourceKafkaConn (EXCLUYENDO TODOS)
'''
st_env.execute_sql(sinkPrint)
st_env.execute_sql("MOSTRAR TABLAS " ).print()
st_env.sql_query("seleccione esquema,ROW( ID,NAME,CREATE_TIME) como carga útil de sourceKafkaConn")
.insert_into("sinkPrint")< / p>
st_env.execute("pyflink-kafka-v4")
4. Ejecución
4.1pythonpyflink-kafka-v4.py
4.2 flinkrun-mxxx.xxx.xxx.xxx:8081-pypyflink-kafka-v4.py
5. Resultados de la ejecución
+------- ---- - -----+|nombretabla|+-----------------
+|sinkPrint|
+|sourceKafkaConn|
+------------------+
2 filas en conjunto
+I(null, 1, prestoEtl,1606902182000)
+I(null,2,ejecutado muy bien,1606902562000)
+I(null,3,analizando el esquema del tema usando flink,16070278000)