当前位置: 首页 > news >正文

老河口网站定制哪家做网站最便宜

老河口网站定制,哪家做网站最便宜,什么网站做烘干设备好,sync wordpress1、发布订阅 订阅模式,消息被路由投递给多个队列,一个消息被多个消费者获取。 1) 可以有多个消费者 2) 每个消费者有自己的queue(队列) 3) 每个队列都要绑定到Exchange(交换机&…

1、发布订阅 

订阅模式,消息被路由投递给多个队列,一个消息被多个消费者获取。

1) 可以有多个消费者
2) 每个消费者有自己的queue(队列)
3) 每个队列都要绑定到Exchange(交换机)
4) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
5) 交换机把消息发送给绑定过的所有队列
6) 队列的消费者都能拿到消息。实现一条消息被多个消费者消费

  • 相关场景:邮件群发,群聊天,广播(广告)

2、Exchanges(交换器)

消息产生者将消息放入交换机,交换机发布订阅把消息发送到所有消息队列中,对应消息队列的消费者拿到消息进行消费

有几种交换器类型可用:directtopicheaders 和 fanout。我们将集中讨论最后一个——fanout

2.1 创建交换器

创建一个这种类型的交换器,并给它起个名字叫logs

err = ch.ExchangeDeclare("logs",   // name"fanout", // typetrue,     // durablefalse,    // auto-deletedfalse,    // internalfalse,    // no-waitnil,      // arguments
)

fanout(扇出)交换器非常简单。正如你可能从名称中猜测的那样,它只是将接收到的所有消息广播到它知道的所有队列中。 

2.2 临时队列

也是自动删除队列吗,和普通队列在使用上没有什么区别,唯一的区别是,当消费者断开连接时,队列将会被删除。自动删除队列允许的消费者没有限制,也就是说当这个队列上最后一个消费者断开连接才会执行删除。

自动删除队列只需要在声明队列时,设置属性auto-delete标识为true即可。系统声明的随机队列,缺省就是自动删除的。

q, err := ch.QueueDeclare("",    // 空字符串作为队列名称false, // 非持久队列false, // delete when unusedtrue,  // 独占队列(当前声明队列的连接关闭后即被删除)false, // no-waitnil,   // arguments
)

上述方法返回时,生成的队列实例包含RabbitMQ生成的随机队列名称。例如,它可能看起来像amq.gen-JzTY20BRgKO-HjmUJj0wLg

2.3 交换器与队列绑定

 

交换器和队列之间的关系称为绑定

err = ch.QueueBind(q.Name, // queue name"",     // routing key"logs", // exchangefalse,nil,
)

从现在开始,logs交换器将会把消息添加到我们的队列中。

2.4 发布消息到交换机

例如,发布到fanout交换器:

body := bodyFrom(os.Args)
err = ch.Publish("logs", // exchange"",     // routing keyfalse,  // mandatoryfalse,  // immediateamqp.Publishing{ContentType: "text/plain",Body:        []byte(body),})

 3 完整代码

产生日志消息的生产程序与上一教程看起来没有太大不同。最重要的变化是我们现在希望将消息发布到logs交换器,而不是空的消息交换器。发送时,我们需要提供一个routingKey,但是对于fanout型交换器,它的值可以被忽略(传空字符串)。下面是emit_log.go脚本的代码:

package mainimport ("log""os""strings""github.com/streadway/amqp"
)func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}func main() {conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()err = ch.ExchangeDeclare("logs",   // name"fanout", // typetrue,     // durablefalse,    // auto-deletedfalse,    // internalfalse,    // no-waitnil,      // arguments)failOnError(err, "Failed to declare an exchange")body := bodyFrom(os.Args)err = ch.Publish("logs", // exchange"",     // routing keyfalse,  // mandatoryfalse,  // immediateamqp.Publishing{ContentType: "text/plain",Body:        []byte(body),})failOnError(err, "Failed to publish a message")log.Printf(" [x] Sent %s", body)
}func bodyFrom(args []string) string {var s stringif (len(args) < 2) || os.Args[1] == "" {s = "hello"} else {s = strings.Join(args[1:], " ")}return s
}

(emit_logs.go源码)

如你所见,在建立连接之后,我们声明了交换器。此步骤是必需的,因为禁止发布到不存在的交换器。

如果没有队列绑定到交换器,那么消息将丢失,但这对我们来说是ok的。如果没有消费者在接收,我们可以安全地丢弃该消息。

receive_logs.go的代码:

package mainimport ("log""github.com/streadway/amqp"
)func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}func main() {conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()err = ch.ExchangeDeclare("logs",   // name"fanout", // typetrue,     // durablefalse,    // auto-deletedfalse,    // internalfalse,    // no-waitnil,      // arguments)failOnError(err, "Failed to declare an exchange")q, err := ch.QueueDeclare("",    // namefalse, // durablefalse, // delete when unusedtrue,  // exclusivefalse, // no-waitnil,   // arguments)failOnError(err, "Failed to declare a queue")err = ch.QueueBind(q.Name, // queue name"",     // routing key"logs", // exchangefalse,nil,)failOnError(err, "Failed to bind a queue")msgs, err := ch.Consume(q.Name, // queue"",     // consumertrue,   // auto-ackfalse,  // exclusivefalse,  // no-localfalse,  // no-waitnil,    // args)failOnError(err, "Failed to register a consumer")forever := make(chan bool)go func() {for d := range msgs {log.Printf(" [x] %s", d.Body)}}()log.Printf(" [*] Waiting for logs. To exit press CTRL+C")<-forever
}

(receive_logs.go源码)

如果要将日志保存到文件,只需打开控制台并输入:

go run receive_logs.go > logs_from_rabbit.log

如果希望在屏幕上查看日志,请切换到一个新的终端并运行:

go run receive_logs.go

当然,要发出日志,请输入:

go run emit_log.go

使用rabbitmqctl list_bindings命令,你可以验证代码是否确实根据需要创建了绑定关系和队列。在运行两个receive_logs.go程序后,你应该看到类似以下内容:

sudo rabbitmqctl list_bindings
# => Listing bindings ...
# => logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
# => logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
# => ...done.

对结果的解释很简单:数据从logs交换器进入了两个由服务器分配名称的队列。这正是我们想要的。

 

 

http://www.bjxfkj.com.cn/article/110710.html

相关文章:

  • 网站优化站外链接怎么做网站3网合一是怎么做的
  • kotlin做网站外贸网站定制公司
  • 怎么做公司网站竞价两个人做aj的视频教程
  • 旅行社网站建设规划书单位怎样做网站
  • 国外做logo的网站wordpress主题模板百度云
  • 扬中做网站整人图片制作器
  • 音乐网站建设规划outlook企业邮箱官网
  • 做交互的设计网站柳江企业网站建设公司
  • 栗田工业大连有效公司网站哪年做的公司建设网站费用做那个科目
  • 做网站接单的网站鉴定手表网站
  • 交易所开发深圳网站制作上海注册公司买新能源车
  • 网站上社保做增员怎么做手机开发者选项
  • 做ic的电子网站有哪些wordpress备份插件中文
  • 建个网站有什么用龙岗中心医院
  • 遵化市城乡建设局网站网页怎么截图
  • 有关网站建设的图片软文代写新闻稿
  • 网站建设推广价格施工企业风险防控
  • HTML建网站新建一个公司官网
  • 口碑好的高密网站建设wordpress 应用商店模板
  • 播放器网站怎么做毕业设计网站题目
  • 怎样申请免费网站空间招标信息网
  • 网站开发语言作用社区网站搭建
  • 企业做网站乐云seo快速上线上海网站制作平台
  • 网站开发建设流程合肥公司网站开发
  • 什么可以用手机做网站怎么建立网站快捷方式
  • jn建站系统官网论坛网站免费建设模板下载
  • 网站开发环境搭建网络管理系统建设方案
  • 怎么把网页设置为不信任网站国外免费网站模板
  • 一级域名建站网站建设行吗建设银行网站查询密码是啥
  • 南通专业家纺网站建设wordpress联系表格