| 除了看日志,我们可以用命令显示的查询我们是否成功的创建了flink-topic,如下: jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-topics.sh --list --zookeeper localhost:2181  flink-tipic 
 如果输出flink-tipic那么说明我们的Topic成功创建了。 那么Topic是保存在哪里?Kafka是怎样进行消息的发布和订阅的呢?为了直观,我们看如下Kafka架构示意图简单理解一下: 
 简单介绍一下,Kafka利用ZooKeeper来存储集群信息,也就是上面我们启动的Kafka Server 实例,一个集群中可以有多个Kafka  Server 实例,Kafka  Server叫做Broker,我们创建的Topic可以在一个或多个Broker中。Kafka利用Push模式发送消息,利用Pull方式拉取消息。 3. 发送消息 如何向已经存在的Topic中发送消息呢,当然我们可以API的方式编写代码发送消息。同时,还可以利用命令方式来便捷的发送消息,如下: jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic flink-topic >Kafka test msg >Kafka connector 
 上面我们发送了两条消息Kafka test msg 和 Kafka connector 到 flink-topic Topic中。 4. 读取消息 如果读取指定Topic的消息呢?同样可以API和命令两种方式都可以完成,我们以命令方式读取flink-topic的消息,如下: jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic flink-topic --from-beginning Kafka test msg Kafka connector 
 其中--from-beginning 描述了我们从Topic开始位置读取消息。 三、Flink Kafka Connector 前面我们以最简单的方式安装了Kafka环境,那么我们以上面的环境介绍Flink Kafka Connector的使用。Flink  Connector相关的基础知识会在《Apache Flink 漫谈系列(14) - Connectors》中介绍,这里我们直接介绍与Kafka  Connector相关的内容。 Apache Flink 中提供了多个版本的Kafka Connector,本篇以flink-1.7.0版本为例进行介绍。 1. mvn 依赖 要使用Kakfa Connector需要在我们的pom中增加对Kafka Connector的依赖,如下: <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>1.7.0</version> </dependency> 
 Flink Kafka Consumer需要知道如何将Kafka中的二进制数据转换为Java / Scala对象。  DeserializationSchema允许用户指定这样的模式。 为每个Kafka消息调用 T deserialize(byte []  message)方法,从Kafka传递值。 2. Examples 我们示例读取Kafka的数据,再将数据做简单处理之后写入到Kafka中。我们需要再创建一个用于写入的Topic,如下: bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flink-tipic-output 
 所以示例中我们Source利用flink-topic, Sink用slink-topic-output。 (1) Simple ETL 我们假设Kafka中存储的就是一个简单的字符串,所以我们需要一个用于对字符串进行serialize和deserialize的实现,也就是我们要定义一个实现DeserializationSchema和SerializationSchema  的序列化和反序列化的类。因为我们示例中是字符串,所以我们自定义一个KafkaMsgSchema实现类,然后在编写Flink主程序。 主程序 - 完整代码import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.util.Preconditions;  import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.nio.charset.Charset;  public class KafkaMsgSchema implements DeserializationSchema<String>, SerializationSchema<String> {     private static final long serialVersionUID = 1L;     private transient Charset charset;      public KafkaMsgSchema() { // 默认UTF-8编码         this(Charset.forName("UTF-8"));     }      public KafkaMsgSchema(Charset charset) {         this.charset = Preconditions.checkNotNull(charset);     }      public Charset getCharset() {         return this.charset;     }      public String deserialize(byte[] message) { // 将Kafka的消息反序列化为java对象         return new String(message, charset);     }      public boolean isEndOfStream(String nextElement) { // 流永远不结束         return false;     }      public byte[] serialize(String element) { // 将java对象序列化为Kafka的消息         return element.getBytes(this.charset);     }      public TypeInformation<String> getProducedType() { // 定义产生的数据Typeinfo         return BasicTypeInfo.STRING_TYPE_INFO;     }      private void writeObject(ObjectOutputStream out) throws IOException {         out.defaultWriteObject();         out.writeUTF(this.charset.name());     }      private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {         in.defaultReadObject();         String charsetName = in.readUTF();         this.charset = Charset.forName(charsetName);     } } 
 import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;  import java.util.Properties;  public class KafkaExample {     public static void main(String[] args) throws Exception {         // 用户参数获取         final ParameterTool parameterTool = ParameterTool.fromArgs(args);         // Stream 环境         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();          // Source的topic         String sourceTopic = "flink-topic";         // Sink的topic         String sinkTopic = "flink-topic-output";         // broker 地址         String broker = "localhost:9092";          // 属性参数 - 实际投产可以在命令行传入         Properties p = parameterTool.getProperties();         p.putAll(parameterTool.getProperties());         p.put("bootstrap.servers", broker);          env.getConfig().setGlobalJobParameters(parameterTool);          // 创建消费者         FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<String>(                 sourceTopic,                 new KafkaMsgSchema(),                 p);         // 设置读取最早的数据 //        consumer.setStartFromEarliest();          // 读取Kafka消息         DataStream<String> input = env.addSource(consumer);           // 数据处理         DataStream<String> result = input.map(new MapFunction<String, String>() {             public String map(String s) throws Exception {                 String msg = "Flink study ".concat(s);                 System.out.println(msg);                 return msg;             }         });          // 创建生产者         FlinkKafkaProducer producer = new FlinkKafkaProducer<String>(                 sinkTopic,                 new KeyedSerializationSchemaWrapper<String>(new KafkaMsgSchema()),                 p,                 FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);          // 将数据写入Kafka指定Topic中         result.addSink(producer);          // 执行job         env.execute("Kafka Example");     } } 
 (编辑:鹰潭站长网) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |