Site Overlay

Spring-Data-Redis动态订阅发布

普通redis订阅,是以用container做容器,配置类配置文件方式直接在spring init的时候进行加载,不能进行动态添加。在程序运行时修改不起作用。

@Configuration
public class RedisChannelConfig {
    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                            MessageListenerAdapter listenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        //订阅主题messagepush和messagepush3
        container.addMessageListener(listenerAdapter, new PatternTopic("messagepush"));
        container.addMessageListener(listenerAdapter, new PatternTopic("messagepush3"));
        //这个container 可以添加多个 messageListener
        return container;
    }

    @Bean
    MessageListenerAdapter listenerAdapter(MessageReceive receiver) {
        //这个地方 是给messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“receiveMessage”
        //也有好几个重载方法,这边默认调用处理器的方法 叫handleMessage 可以自己到源码里面看
        return new MessageListenerAdapter(receiver, "receiveMessage");
    }

    @Bean //注入操作数据的template(这里不需要操作redis数据,和消息队列功能无关)
    StringRedisTemplate template(RedisConnectionFactory connectionFactory) {
        return new StringRedisTemplate(connectionFactory);
    }
}
//此段代码摘自: https://blog.csdn.net/zhang18024666607/article/details/84392335

如果只是当做固定的消息队列进行订阅发布,足够,但是如果需求是根据前台传入的字段,动态的订阅的话就无法满足了,想要实现就不能用 container 的方式进行订阅,但是可以利用Lettuce客户端进行订阅,旧版本中的spring-data-redis中的自带客户端都是jedis,新版本后都换成了Lettuce,还自带了异步方法,不会对系统阻塞。

package com.miracle.im.service;

import com.miracle.im.pojo.Msg;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.RedisURI;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * @Author Diuut
 * @Date 2020/4/13  9:50
 */
@Slf4j
@Service
public class ImService {
    @Autowired
    private MyRedisPubSubListener myRedisPubSubListener;

    public String publish(String consumer, String msg) {
        RedisURI redisUri = RedisURI.Builder.redis("xxxxxx")
                .withPassword("xxxxxx")
                .withDatabase(2)
                .withPort(6379)
                .build();
//        RedisURI redisUri = RedisURI.Builder.redis("127.0.0.1").build();
        RedisClient redisClient = RedisClient.create(redisUri);
        StatefulRedisPubSubConnection<String, String> connection = redisClient.connectPubSub();
        RedisPubSubAsyncCommands<String, String> async = connection.async();
        RedisFuture<Long> publish = async.publish(consumer, msg);
        return "执行完毕";
    }

    public String subscribe(String username) {
        RedisURI redisUri = RedisURI.Builder.redis("xxxxxx")
                .withPassword("xxxxxx")
                .withDatabase(2)
                .withPort(6379)
                .build();
//        RedisURI redisUri = RedisURI.Builder.redis("127.0.0.1").build();
        RedisClient client = RedisClient.create(redisUri);
        StatefulRedisPubSubConnection<String, String> connection = client.connectPubSub();
        connection.addListener(myRedisPubSubListener);
        RedisPubSubAsyncCommands<String, String> async = connection.async();
        async.subscribe("Topic_" + username);
        async.subscribe("Topic_server");
//        log.info("future: {}",future);
        return "执行完毕";
    }
}
package com.miracle.im.service;


import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;


import io.lettuce.core.pubsub.RedisPubSubListener;

/**
 * Simple to Introduction
 * className: MyRedisPubSubListener
 *
 */
@Slf4j
@Component
public class MyRedisPubSubListener implements RedisPubSubListener<String, String> {

    @Override
    public void message(String channel, String message) {
        log.info("msg1={} on channel {}", message, channel);
    }

    @Override
    public void message(String pattern, String channel, String message) {
        log.info("msg2={} in channel={}", message, channel);
    }

    @Override
    public void subscribed(String channel, long count) {
        log.info("sub channel={}, count={}", channel, count);
    }

    @Override
    public void psubscribed(String pattern, long count) {
        log.info("psub pattern={}, count={}", pattern, count);
    }

    @Override
    public void unsubscribed(String channel, long count) {
        log.info("unsub channel={}, count={}", channel, count);
    }

    @Override
    public void punsubscribed(String pattern, long count) {
        log.info("punsub channel={}, count={}", pattern, count);
    }
}

发表回复

您的电子邮箱地址不会被公开。

A beliving heart is your magic My heart
欢迎来到Diuut的个人博客,这里是我的一些零零碎碎的知识汇总,希望有能帮到你的内容。 | 蜀ICP备2021011635号-1 | Copyright © 2024 Diuut. All Rights Reserved.