Red de conocimiento informático - Problemas con los teléfonos móviles - pyflink consume mensajes kafka-connect-jdbc (con esquema)

pyflink consume mensajes kafka-connect-jdbc (con esquema)

1. Acceso a datos

Cree un conector para mysql a través de la interfaz restFul de Kafka e inícielo.

{

"name": "mysql_stream_test",

"config": {

"connector.class": "io .confluent.connect.jdbc.JdbcSourceConnector",

"timestamp.column.name":"",

"incrementing.column.name":"ID",

"connection.password": "",

"validate.non. null": true,

"tasks.max": 1,

"batch.max.rows":100,

"table.whitelist":"baseqx.test_demo",

"mode":" incrementando",

"topic.prefix": "mysql_",

"connection.user": ",

"poll.interval.ms": 5000,

"numeric.mapping":" best_fit",

"connection.url":"jdbc:mysql://xxx.xxx.xxx.xxx:3306/baseqx?useUnicode=true&characterEncoding=utf8&. enableMultiQueries=true"

}

2. kafka-connect crea un formato de datos predeterminado en el tema como

{"schema":{"type": " struct", "fields":[{"type":" int32", "optional":false, "field":"ID"},{"type":"string","optional":false," Campo ":"ID"Nombre"},{"Tipo":"int64", "Opcional":false, "nombre":" org.apache.kafka.connect.data.Timestamp", "versión":1, "campo ":"CREATE_TIME"}], "opcional":false, "name":"test_demo"}, "payload":{"ID".1, "NAME":"prestoEtl", "CREATE_TIME":1606902182000 }}

3. Utilice pyflink para consumir información a través de patrones

#! /usr/bin/python3.7

# -*- Codificación: UTF-8 -*-

de pyflink.datastream import StreamExecutionEnvironment, CheckpointingMode

de pyflink.table import StreamTableEnvironment, TableConfig, SqlDialect

s_env = StreamExecutionEnvironment.get_execution_environment()

s_env.set_parallelism(1)

st_env = StreamTableEnvironment.create( s_env, TableConfig())

st_

env.get_ config().set_python_executable("python3")

st_env.use_catalog("default_catalog")

st_env.use_database("default_database" )

# Corregir el esquema como una cadena en DML y usar la función FILA para encapsular la carga útil

ddlKafkaConn = """

crear tabla sourceKafkaConn(

`scheam ` ¿STRING? comentario ' kafkaConn esquema por fila',

`carga útil`? ROW(ID BIGINT,NAME STRING,CREATE_TIME STRING)? Obtener datos con esquema de kafkaConnect'

with(

'connector' = 'kafka',

'topic' = 'mysql_test_demo',

'properties.bootstrap.servers' = '192.168.113.11:9092',

'scan.startup.mode' = 'compensación más temprana',

'formato' = 'json'

)

"""

# 'connector.startup-mode' = 'earliest-offset significa leer el mensaje más antiguo | último-offset significa leer el último mensaje en la cola de mensajes',

st_env.execute_sql( ddlKafkaConn)

fregaderoPrint = '''

CREAR TABLA fregaderoPrint CON ('conector' = ' print')

COMO sourceKafkaConn (EXCLUYENDO TODOS)

'''

st_env.execute_sql(sinkPrint)

st_env.execute_sql("MOSTRAR TABLAS " ).print()

st_env.sql_query("seleccione esquema,ROW( ID,NAME,CREATE_TIME) como carga útil de sourceKafkaConn")

.insert_into("sinkPrint")< / p>

st_env.execute("pyflink-kafka-v4")

4. Ejecución

4.1pythonpyflink-kafka-v4.py

4.2 flinkrun-mxxx.xxx.xxx.xxx:8081-pypyflink-kafka-v4.py

5. Resultados de la ejecución

+------- ---- - -----+|nombretabla|+-----------------

+|sinkPrint|

+|sourceKafkaConn|

+------------------+

2 filas en conjunto

+I(null, 1, prestoEtl,1606902182000)

+I(null,2,ejecutado muy bien,1606902562000)

+I(null,3,analizando el esquema del tema usando flink,16070278000)