分享

使用spring cloud stream实现通信及与kafka整合

Oner 2018-1-25 15:34:53 发表于 实操演练 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 32508
问题导读:
1. 如何设置配置文件?
2. 如何定义接口,进行channel绑定?
3. 如何编写输出应用?
4. 如何发送消息?
5. 如何接收消息?
6.如何实现与kafka整合?



最近在工作中会使用到spring cloud stream,网上中文资料几乎没有,阅读官网配置,好不容易搞定,在这里分享一下使用过程,也是自己做一个记录。

主页君自己使用场景是实现两个app之间的通信,第一个app输出数据,第二个app输入数据。这里的app也就是指一个service。

1.配置文件中进行绑定

app1中,配置输出通道相关信息。
application.yml 或者 application.properties或者配置文件中,配置
[mw_shl_code=applescript,true]spring:  
  cloud:  
    stream:  
      bindings:  
        output_channel:       #channelName  
          destination: mydest #destination,或者可以认为是发布-订阅模型里面的topic  
          binder: rabbit1  
      binders:  
        rabbit1:  
          type: rabbit  
          environment:  
            spring:  
              rabbitmq:  
                host: 192.168.1.1   #rabbitMQ服务器地址  
                port: 5672          #rabbitMQ服务器端口  
                username: username  
                password: pwd  
                virtual-host: /hostName  [/mw_shl_code]
app2中,配置输入通道相关信息。[mw_shl_code=applescript,true]spring:  
  cloud:  
    stream:  
      bindings:  
        input_channel:        #<span style="font-family: Arial, Helvetica, sans-serif;">position1。</span><span style="font-family: Arial, Helvetica, sans-serif;">channelName.</span>  
          destination: modest #position2。destination,或者可以认为是发布-订阅模型里面的topic,这里应该与输出app中发布的topic一致,表示订阅该主题  
          binder: rabbit1  
      binders:  
        rabbit1:  
          type: rabbit     #可以是其它,比如kafka  
          environment:  
            spring:  
              rabbitmq:  
                host: 192.168.1.1   #rabbitMQ服务器地址  
                port: 5672          #rabbitMQ服务器端口  
                username: username  
                password: pwd  
                virtual-host: /hostName  [/mw_shl_code]

2.定义接口,进行channel绑定

官网的例子中,使用的是sink和source接口,如果每个app里面只有一个通道,可以直接使用stream自带的接口,但是如果要实现多个接口,就需要自己进行接口定义了。下面是我自己定义的接口例子,具体用法会在注释中详细说明。

[mw_shl_code=java,true]import org.springframework.cloud.stream.annotation.Input;  
import org.springframework.cloud.stream.annotation.Output;  
import org.springframework.messaging.MessageChannel;  
import org.springframework.messaging.SubscribableChannel;  
  
public interface Barista {  
  
    String INPUT_CHANNEL = "input_channel";  #position3  
    String OUTPUT<span style="font-family: Arial, Helvetica, sans-serif;">_CHANNEL</span> = "output_channel";  
  
    String INPUT1 = "input1";  
    String OUTPUT1 = "output1";  
    #注解@Input声明了它是一个输入类型的通道,名字是<span style="font-family: Arial, Helvetica, sans-serif;">Barista</span><span style="font-family: Arial, Helvetica, sans-serif;">.INPUT_CHANNEL,也就是position3的input_channel。这一名字与上述配置app2的配置文件中position1应该一致,表明注入了一个名字叫做input_channel的通道,它的类型是input,订阅的主题是position2处声明的mydest这个主题  
</span>    @Input(<span style="font-family: Arial, Helvetica, sans-serif;">Barista</span><span style="font-family: Arial, Helvetica, sans-serif;">.INPUT_CHANNEL</span><span style="font-family: Arial, Helvetica, sans-serif;">)</span>  
    SubscribableChannel logInput();  
    #注解@Output声明了它是一个输出类型的通道,名字是<span style="font-family: Arial, Helvetica, sans-serif;">output_channel。这一名字与app1中通道名一致,表明注入了一个名字为output_channel的通道,类型是output,发布的主题名为mydest。</span>  
    @Output(Barista.<span style="font-family: Arial, Helvetica, sans-serif;">OUTPUT</span><span style="font-family: Arial, Helvetica, sans-serif;">_CHANNEL</span><span style="font-family: Arial, Helvetica, sans-serif;">)</span>  
    MessageChannel logOutPut();  
  
    @Input(Barista.INPUT1)  
    SubscribableChannel input1();  
  
    @Output(Barista.OUTPUT1)  
    MessageChannel output1();  
      
}  [/mw_shl_code]
这里的Barista接口是定义来作为后面类的参数,这一接口定义来通道类型和通道名称。
通道名称是作为配置用,通道类型则决定了app会使用这一通道进行发送消息还是从中接收消息。

3.在输出应用的Application.java类中注入上述Barista接口

[mw_shl_code=java,true]import Barista;  
import org.springframework.boot.SpringApplication;  
import org.springframework.boot.autoconfigure.SpringBootApplication;  
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;  
import org.springframework.cloud.netflix.feign.EnableFeignClients;  
import org.springframework.cloud.stream.annotation.EnableBinding;  
import org.springframework.session.data.redis.config.annotation.web.http.EnableRedisHttpSession;  
  
/**
*  
*/  
@SpringBootApplication  
@EnableDiscoveryClient  
@EnableFeignClients  
@EnableRedisHttpSession  
@EnableBinding(Barista.class)  
public class OutputServiceApplication {  
    public static void main(String[] args) {  
        SpringApplication.run(<span style="font-family: Arial, Helvetica, sans-serif;">OutputServiceApplication</span><span style="font-family: Arial, Helvetica, sans-serif;">.class, args);</span>  
    }  
}  [/mw_shl_code]

这里需要在Application类中注入接口,否则在引用接口的时候,会提示找不到bean。原因是stream只有找到@EnableBinding注解,才会自动给其中的参数Barista.class创建实例。这是spring cloud stream自带的操作,需要遵守。

4.发送消息

[mw_shl_code=java,true]import Barista;  
import org.apache.commons.logging.Log;  
import org.apache.commons.logging.LogFactory;  
import org.springframework.beans.factory.annotation.Autowired;  
import org.springframework.messaging.support.MessageBuilder;  
import org.springframework.stereotype.Component;  
import org.springframework.stereotype.Service;  
  
@Service  
@Component  
public class RabbitSender {  
  
    @Autowired  
    private Barista source;  
  
    // 发送消息  
    public String sendMessage(){  
        try{  
        Obj obj = new Obj();  
            source.logOutPut().send(MessageBuilder.withPayload(obj).build());  
            logger.info("发送");  
  
        }catch (Exception e){  
            e.printStackTrace();  
        }  
        return null;  
    }  
}  [/mw_shl_code]

source是一个实例化的Barista对象,调用其中的logOutPut()方法实际上是调用了一个MessageChannel,并使用它将消息发送出去。

5.接收消息

在输入应用中注入接口,并声明监听
[mw_shl_code=java,true]import Barista;  
import org.apache.commons.logging.Log;  
import org.apache.commons.logging.LogFactory;  
import org.springframework.beans.factory.annotation.Autowired;  
import org.springframework.cloud.stream.annotation.EnableBinding;  
import org.springframework.cloud.stream.annotation.StreamListener;  
import org.springframework.messaging.Message;  
  
@EnableBinding(Barista.class)  
public class RabbitReceiver {  
    private static final Log logger = LogFactory.getLog(RabbitReceiver.class);  
  
    @StreamListener(Barista.LOG_INPUT)  
    public void receiver(Message<Obj> message){  
        Obj obj = message.getPayload();  
        logger.info("接受对象:"+<span style="font-family: Arial, Helvetica, sans-serif;">obj</span><span style="font-family: Arial, Helvetica, sans-serif;">+"\n");</span>  
    }  
}  [/mw_shl_code]

首先,在类上添加注解@EnableBinding(Barista.class),实现与消息代理的连接。在方法receiver()上添加注解@StreamListener,并以通道名称作为参数,可以实现对相应通道的监听。

至此,实现了两个app之间进行消息传递。

##################
spring cloud还可以与kafka整合。下面是整合配置

1、在pom.xml里面添加kafka的maven依赖
[mw_shl_code=xml,true]<dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>[/mw_shl_code]


2、在properties 配置文件里面添加 kafka binder 参数
[mw_shl_code=bash,true]spring.cloud.stream.kafka.binder.brokers=127.0.0.1:9092
spring.cloud.stream.kafka.binder.zk-nodes=127.0.0.1:2181
spring.cloud.stream.kafka.binder.minPartitionCount=1
spring.cloud.stream.kafka.binder.autoCreateTopics=true
spring.cloud.stream.kafka.binder.autoAddPartitions=true[/mw_shl_code]


3、输入通道定义,供消费者使用


(1)在properties配置文件里面添加输入通道配置信息
[mw_shl_code=bash,true]spring.cloud.stream.bindings.testa.destination=test_spring_stream  
spring.cloud.stream.bindings.testa.group=group-1  
spring.cloud.stream.bindings.testa.consumer.concurrency=1  
spring.cloud.stream.bindings.testa.consumer.partitioned=false  [/mw_shl_code]


(2)定义输入通道并绑定输入通道配置信息

[mw_shl_code=java,true]public interface Sink {  
      
    //接收队列1  
    String INPUT_1 = "testa";  
  
    @Input(Sink.INPUT_1)  
    SubscribableChannel input1();  
  
}  [/mw_shl_code]

INPUT_1 = "testa" 跟配置文件里面的通道名称 testa 保持一致


4、输出通道定义,供生产者使用

(1)在properties配置文件里面添加输出通道配置信息

[mw_shl_code=bash,true]spring.cloud.stream.bindings.sourceA.destination=test_spring_stream  
spring.cloud.stream.bindings.sourceA.producer.partitionCount=1  [/mw_shl_code]


(2)定义输出通道并绑定输出通道配置信息
[mw_shl_code=bash,true]public interface Source {  
      
    //发送队列1  
    String OUTPUT_1 = "sourceA";  
      
    @Output(Source.OUTPUT_1)  
    MessageChannel output1();  
      
}  [/mw_shl_code]

OUTPUT_1 = "sourceA" 跟配置文件里面的通道名称 sourceA 保持一致


5、生产者端代码
[mw_shl_code=java,true]@EnableBinding(Source.class)  
public class KafkaSender {  
      
    private final Logger logger = LoggerFactory.getLogger(KafkaSender.class);  
      
    @Autowired  
    private Source source;  
      
    public void sendMessage(String message) {  
    try {  
        source.output1().send(MessageBuilder.withPayload("message: " + message).build());  
    } catch (Exception e) {  
        logger.info("消息发送失败,原因:"+e);  
        e.printStackTrace();  
    }  
    }  
}  [/mw_shl_code]
调用sendMessage方法发送消息



6、消费者端代码
[mw_shl_code=java,true]
@EnableBinding(Sink.class)
public class KafkaReceiver {

        private final Logger logger = LoggerFactory.getLogger(KafkaReceiver.class);

        @StreamListener(Sink.INPUT_1)
        private void receive(String vote) {
                logger.info("receive message : " + vote);
        }
       
}[/mw_shl_code]
通过receive方法接收消息




来源:http://blog.csdn.net/phyllisy/article/details/51382018
作者:phyllisy

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条