`
sjkgxf7191
  • 浏览: 252141 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

LCDS:Messaging Service 学习(二)

    博客分类:
  • lcds
阅读更多

Message filtering

 

Messaging Service允许给Producer组件添加message headers 或者增加subtopic 信息。然后Consumer组件在调用subscribe()方法的时候会把它的过滤信息发送回服务器进行消息过滤

 

对于更高级复杂的消息 ,建议使用mx.messaging.MultiTopicConsumer 和mx.messaging.MultiTopicProducer


Using selectors:使用message headers

Producer :message header的名字不能为JMS ”和“DS ”这些保留字符串

var message:AsyncMessage = new AsyncMessage();
message.headers = new Array();
message.headers["prop1"] = 5;
message.body = input.text;
producer.send(message);
 

Consumer :使用Consumer.selector 来制定一个message selector

<mx:Consumer id="consumer"
    destination="chat"
    selector="prop1 > 4"
    message="messageHandler(event);"/>
 

Using subtopics

  • 不能在JMS destination 上使用subtopics 。但是可以在JMS destination 上使用message headers
  • producer .subtopic = "chat.fds.newton"; // 指定Producer派发mesage的subtopic
  • consumer .subtopic = "chat.fds.*"; // 指定Consumer可以接收的subtopic;在consumer.subscribe();之前设置
  • 在destination中允许使用subtopic,默认通配符是“.” ,可以不用指定
<destination id="chat">
	<properties>
		<network>
			<subscription-timeout-minutes>0</subscription-timeout-minutes>
		</network>
		<server>
			<message-time-to-live>0</message-time-to-live>
			<allow-subtopics>true</allow-subtopics>
			<subtopic-separator>.</subtopic-separator>
		</server>
	</properties>
	<channels>
		<channel ref="my-polling-amf"/>
	</channels>
</destination>
 

Configuring the Messaging Service

 

示例

<service id="message-service"
	class="flex.messaging.services.MessageService">
	<adapters>
		<adapter-definition id="actionscript"
			class="flex.messaging.services.messaging.adapters.ActionScriptAdapter"
			default="true"/>
		<adapter-definition id="jms"
			class="flex.messaging.services.messaging.adapters.JMSAdapter"/>
	</adapters>
	<destination id="chat-topic">
		<properties>
			<server>
				<message-time-to-live>0</message-time-to-live>
			</server>
		</properties>
		<channels>
			<channel ref="samples-rtmp"/>
			<channel ref="samples-amf-polling"/>
		</channels>
	</destination>
</service>
 

Configuring the adapter

如果 在destination里不指定 adapter,那么destination会使用缺省的 在service里<adapters>标签中定义的adapter。

给destination自定义引用adapter

<destination id="chat-topic">
    <adapter ref="actionscript"/>
</destination>
 

Defining the destination

  • Setting network properties in the destination
<destination id="chat-topic">
    <properties>
        <network>
            <throttle-inbound policy="ERROR" max-frequency="50"/>
            <throttle-outbound policy="ERROR" max-frequency="500"/>
        </network>
    </properties>
</destination>

subscription-timeout-minutes : 订阅者自动退订时间 ;即N分钟内没有收到message就自动退订;该值默认为0,表示永远不会强制退订。

throttle-inbound :max-frequency属性表示该destination每秒接收 消息的最高值;policy属性表示当达到mesage limit的时候该如何处理:NONE :不采取任何措施;ERROR :不接收多出的mesage并且发送error到客户端;IGNORE :不接收多出的message但不发送error到客户端。

throttle-outbound :max-frequency属性表示该destination每秒发送 消息的最高值;policy属性表示当达到mesage limit的时候该如何处理:NONE :不采取任何措施;ERROR :不发送多出的mesage并且发送error到客户端;IGNORE :不发送多出的 message但不发送error到客户端。

  • Setting server properties in the destination
<destination id="chat-topic">
    <properties>
    ...
        <server>
            <message-time-to-live>0</message-time-to-live>
       </server>
    </properties>
</destination>

allow-subtopics 允许 destination中使用subtopics

cluster-message-routing :destination使用的message-route的模式:server-to-server (default) 和 broadcast 。前者表示只有data message路由到servers,但是subscribe and unsubscribe messages全部通过cluster(集群)来broadcast ;后者表示全部messages全部通过cluster来broadcast。

message-time-to-live :表示某条message的存活时间(毫秒) ,超过这个时间就会被抛弃;0表示这条message永远不会过期。

send-security-constraint :接收消息安全限制,针对Producer 组件,下面有示例代码

subscribe-security-constraint :发送消息安全限制,针对Consumer 组件,下面有示例代码

subtopic-separator :设置subtopic的通配符 ;默认是“.”。

  • Referencing message channels in the destination
<destination id="chat-topic">
    ...
    <channels>
        <channel ref="samples-rtmp"/>
        <channel ref="samples-amf-polling"/>
    </channels>
    ...
</destination>
  • Applying security to the destination
<destination id="chat">
    ...
    <properties>
        <server>
            <send-security-constraint ref="sample-users"/>
            <subscribe-security-constraint ref="sample-users"/>
        </server>
    </properties>
    ...
</destination>
 

Creating a custom Message Service adapter

  • 一个Message Service adapter类必须继承flex.messaging.services.MessageServiceAdapter 类。
  • 需要创建一个flex.messaging.MessageService实例
  • 需要一个invoke() 方法,接收客户端传过来的消息,再将处理好的消息发送回客户端。
  • MessageService.pushMessageToClients ()方法向客户端发送消息。第一个参数是消息对象,第二个参数是布尔值,表示是否使用消息选择器语句。
  • MessageService.sendPushMessageFromPeer ()方法用来在集群环境 下向每个服务器节点广播消息。第一个参数是消息对象,第二个参数是布尔值,表示是否使用消息选择器语句。
package com;

import java.util.Random;
import flex.messaging.MessageBroker;
import flex.messaging.io.ArrayCollection;
import flex.messaging.messages.AsyncMessage;
import flex.messaging.messages.Message;
import flex.messaging.services.MessageService;
import flex.messaging.services.ServiceAdapter;
import flex.messaging.util.UUIDUtils;

public class RandomNumberGenerator extends ServiceAdapter {

	private static DataGenerator thread;
	
	private static ArrayCollection randomNumbers = new ArrayCollection();
	
	public RandomNumberGenerator()
	{ 
		Random random = new Random();
		for (int i = 0; i < 6; i++)
		{
			randomNumbers.add(new Integer(random.nextInt(100)));
		}
	}
	
	public void start()
	{
		if (thread == null)
		{
			thread = new DataGenerator();
			thread.start();
		}
	}
	
	public void stop()
	{
		thread.running = false;
		thread = null;
	}
	
	public static class DataGenerator extends Thread
	{
		public boolean running = true;
		public void run()
		{ 
			MessageBroker msgBroker = MessageBroker.getMessageBroker(null);
			String clientID = UUIDUtils.createUUID();
			Random random = new Random();
			while (running)
			{
				randomNumbers.clear();
				{
					for (int i = 0; i < 6; i++)
					{
						randomNumbers.add(new Integer(random.nextInt(100)));
					}
				}
				AsyncMessage msg = new AsyncMessage();
				msg.setDestination("RandomDataPush");
				msg.setClientId(clientID);
				msg.setMessageId(UUIDUtils.createUUID());
				msg.setBody(randomNumbers);
				msgBroker.routeMessageToService(msg, null);
				try
				{
					Thread.sleep(1000);
				} catch (InterruptedException e)
				{
					e.printStackTrace();
				}
			}
		}
	}
	
	public Object invoke(Message message)
	{
		if (message.getBody().equals("New"))
		{
			return randomNumbers;
		} else
		{			
			AsyncMessage newMessage = (AsyncMessage) message;
			newMessage.setBody(randomNumbers);
			MessageService msgService = (MessageService) getDestination().getService();
			msgService.pushMessageToClients(newMessage, false);
		}
		return null;
	}
}
<adapters>
    ...
    adapter-definition id="cfgateway" class="foo.bar.SampleMessageAdapter"/>
    ...
</adapters>
<destination id="chat">
	<adapter ref="cfgateway" />
	<channels>
		<channel ref="my-polling-amf"/>
	</channels>
</destination>
5
1
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics