Apache Geode CHANGELOG

Configuring Dispatcher Threads and Order Policy for Event Distribution

By default, Geode uses multiple dispatcher threads to process region events simultaneously in a gateway sender queue for distribution between sites, or in an asynchronous event queue for distributing events for write-behind caching. With serial queues, you can also configure the ordering policy for dispatching those events.

By default, a gateway sender queue or asynchronous event queue uses 5 dispatcher threads per queue. This provides support for applications that have the ability to process queued events concurrently for distribution to another Geode site or listener. If your application does not require concurrent distribution, or if you do not have enough resources to support the requirements of multiple dispatcher threads, then you can configure a single dispatcher thread to process a queue.

Using Multiple Dispatcher Threads to Process a Queue

When multiple dispatcher threads are configured for a parallel queue, Geode simply uses multiple threads to process the contents of each individual queue. The total number of queues that are created is still determined by the number of Geode members that host the region.

When multiple dispatcher threads are configured for a serial queue, Geode creates an additional copy of the queue for each thread on each member that hosts the queue. To obtain the maximum throughput, increase the number of dispatcher threads until your network is saturated.

The following diagram illustrates a serial gateway sender queue that is configured with multiple dispatcher threads.

Performance and Memory Considerations

When a serial gateway sender or an asynchronous event queue uses multiple dispatcher threads, consider the following:

  • Queue attributes are repeated for each copy of the queue that is created for a dispatcher thread. That is, each concurrent queue points to the same disk store, so the same disk directories are used. If persistence is enabled and overflow occurs, the threads that insert entries into the queues compete for the disk. This applies to application threads and dispatcher threads, so it can affect application performance.
  • The maximum-queue-memory setting applies to each copy of the serial queue. If you configure 10 dispatcher threads and the maximum queue memory is set to 100MB, then the total maximum queue memory for the queue is 1000MB on each member that hosts the queue.

Configuring the Ordering Policy for Serial Queues

When using multiple dispatcher-threads (greater than 1) with a serial event queue, you can also configure the order-policy that those threads use to distribute events from the queue. The valid order policy values are:

  • key (default). All updates to the same key are distributed in order. Geode preserves key ordering by placing all updates to the same key in the same dispatcher thread queue. You typically use key ordering when updates to entries have no relationship to each other, such as for an application that uses a single feeder to distribute stock updates to several other systems.
  • thread. All region updates from a given thread are distributed in order. Geode preserves thread ordering by placing all region updates from the same thread into the same dispatcher thread queue. In general, use thread ordering when updates to one region entry affect updates to another region entry.
  • partition. All region events that share the same partitioning key are distributed in order. Specify partition ordering when applications use a PartitionResolver to implement custom partitioning. With partition ordering, all entries that share the same “partitioning key” (RoutingObject) are placed into the same dispatcher thread queue.

You cannot configure the order-policy for a parallel event queue, because parallel queues cannot preserve event ordering for regions. Only the ordering of events for a given partition (or in a given queue of a distributed region) can be preserved.

Examples—Configuring Dispatcher Threads and Ordering Policy for a Serial Gateway Sender Queue

To increase the number of dispatcher threads and set the ordering policy for a serial gateway sender, use one of the following mechanisms.

  • cache.xml configuration

    <cache>
      <gateway-sender id="NY" parallel="false" 
       remote-distributed-system-id="1"
       enable-persistence="true"
       disk-store-name="gateway-disk-store"
       maximum-queue-memory="200"
       dispatcher-threads=7 order-policy="key"/> 
       ... 
    </cache>
    
  • Java API configuration

    Cache cache = new CacheFactory().create();
    
    GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
    gateway.setParallel(false);
    gateway.setPersistenceEnabled(true);
    gateway.setDiskStoreName("gateway-disk-store");
    gateway.setMaximumQueueMemory(200);
    gateway.setDispatcherThreads(7);
    gateway.setOrderPolicy(OrderPolicy.KEY);
    GatewaySender sender = gateway.create("NY", "1");
    sender.start();
    
  • gfsh:

    gfsh>create gateway-sender -d="NY" 
       --parallel=false 
       --remote-distributed-system-id="1"
       --enable-persistence=true
       --disk-store-name="gateway-disk-store"
       --maximum-queue-memory=200
       --dispatcher-threads=7 
       --order-policy="key"
    

The following examples show how to set dispatcher threads and ordering policy for an asynchronous event queue:

  • cache.xml configuration

    <cache>
       <async-event-queue id="sampleQueue" persistent="true"
        disk-store-name="async-disk-store" parallel="false"
        dispatcher-threads=7 order-policy="key">
          <async-event-listener>
             <class-name>MyAsyncEventListener</class-name>
             <parameter name="url"> 
               <string>jdbc:db2:SAMPLE</string> 
             </parameter> 
             <parameter name="username"> 
               <string>gfeadmin</string> 
             </parameter> 
             <parameter name="password"> 
               <string>admin1</string> 
             </parameter> 
        </async-event-listener>
        </async-event-queue>
    ...
    </cache>
    
  • Java API configuration

    Cache cache = new CacheFactory().create();
    AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
    factory.setPersistent(true);
    factory.setDiskStoreName("async-disk-store");
    factory.setParallel(false);
    factory.setDispatcherThreads(7);
    factory.setOrderPolicy(OrderPolicy.KEY);
    AsyncEventListener listener = new MyAsyncEventListener();
    AsyncEventQueue sampleQueue = factory.create("customerWB", listener);
    

    Entry updates in the current, in-process batch are not eligible for conflation.

  • gfsh:

    gfsh>create async-event-queue --id="sampleQueue" --persistent=true
    --disk-store="async-disk-store" --parallel=false
    --dispatcher-threads=7 order-policy="key"
    --listener=myAsycEventListener 
    --listener-param=url#jdbc:db2:SAMPLE 
    --listener-param=username#gfeadmin 
    --listener-param=password#admin1