From 5cdae049e3419667b5646d83e685f22b7c1e061c Mon Sep 17 00:00:00 2001 From: rpopma Date: Sun, 19 Jun 2016 20:19:48 +0900 Subject: [PATCH 1/6] Added TimeoutBlockingWaitStrategy as a variation of the TimeoutBlockingWaitStrategy that attempts to elide conditional wake-ups when the lock is uncontended. --- .../LiteTimeoutBlockingWaitStrategy.java | 83 +++++++++++++++++++ 1 file changed, 83 insertions(+) create mode 100644 src/main/java/com/lmax/disruptor/LiteTimeoutBlockingWaitStrategy.java diff --git a/src/main/java/com/lmax/disruptor/LiteTimeoutBlockingWaitStrategy.java b/src/main/java/com/lmax/disruptor/LiteTimeoutBlockingWaitStrategy.java new file mode 100644 index 000000000..7098fc1fb --- /dev/null +++ b/src/main/java/com/lmax/disruptor/LiteTimeoutBlockingWaitStrategy.java @@ -0,0 +1,83 @@ +package com.lmax.disruptor; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Variation of the {@link TimeoutBlockingWaitStrategy} that attempts to elide conditional wake-ups + * when the lock is uncontended. + */ +public class LiteTimeoutBlockingWaitStrategy implements WaitStrategy +{ + private final Lock lock = new ReentrantLock(); + private final Condition processorNotifyCondition = lock.newCondition(); + private final AtomicBoolean signalNeeded = new AtomicBoolean(false); + private final long timeoutInNanos; + + public LiteTimeoutBlockingWaitStrategy(final long timeout, final TimeUnit units) + { + timeoutInNanos = units.toNanos(timeout); + } + + @Override + public long waitFor( + final long sequence, + final Sequence cursorSequence, + final Sequence dependentSequence, + final SequenceBarrier barrier) + throws AlertException, InterruptedException, TimeoutException + { + long nanos = timeoutInNanos; + + long availableSequence; + if (cursorSequence.get() < sequence) + { + lock.lock(); + try + { + while (cursorSequence.get() < sequence) + { + signalNeeded.getAndSet(true); + + barrier.checkAlert(); + nanos = processorNotifyCondition.awaitNanos(nanos); + if (nanos <= 0) + { + throw TimeoutException.INSTANCE; + } + } + } + finally + { + lock.unlock(); + } + } + + while ((availableSequence = dependentSequence.get()) < sequence) + { + barrier.checkAlert(); + } + + return availableSequence; + } + + @Override + public void signalAllWhenBlocking() + { + if (signalNeeded.getAndSet(false)) + { + lock.lock(); + try + { + processorNotifyCondition.signalAll(); + } + finally + { + lock.unlock(); + } + } + } +} From 2851cf8cb48519e3fb81971428ed32274b3f4662 Mon Sep 17 00:00:00 2001 From: rpopma Date: Sun, 19 Jun 2016 20:20:27 +0900 Subject: [PATCH 2/6] Test for the timeout behaviour of the LiteTimeoutBlockingWaitStrategy. --- .../LiteTimeoutBlockingWaitStrategyTest.java | 54 +++++++++++++++++++ 1 file changed, 54 insertions(+) create mode 100644 src/test/java/com/lmax/disruptor/LiteTimeoutBlockingWaitStrategyTest.java diff --git a/src/test/java/com/lmax/disruptor/LiteTimeoutBlockingWaitStrategyTest.java b/src/test/java/com/lmax/disruptor/LiteTimeoutBlockingWaitStrategyTest.java new file mode 100644 index 000000000..1abef842d --- /dev/null +++ b/src/test/java/com/lmax/disruptor/LiteTimeoutBlockingWaitStrategyTest.java @@ -0,0 +1,54 @@ +package com.lmax.disruptor; + +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.junit.Assert.*; + +import java.util.concurrent.TimeUnit; + +import org.jmock.Expectations; +import org.jmock.Mockery; +import org.jmock.integration.junit4.JMock; +import org.junit.Test; +import org.junit.runner.RunWith; + +@RunWith(JMock.class) +public class LiteTimeoutBlockingWaitStrategyTest +{ + private final Mockery mockery = new Mockery(); + + @Test + public void shouldTimeoutWaitFor() throws Exception + { + final SequenceBarrier sequenceBarrier = mockery.mock(SequenceBarrier.class); + + long theTimeout = 500; + LiteTimeoutBlockingWaitStrategy waitStrategy = new LiteTimeoutBlockingWaitStrategy(theTimeout, TimeUnit.MILLISECONDS); + Sequence cursor = new Sequence(5); + Sequence dependent = cursor; + + mockery.checking( + new Expectations() + { + { + allowing(sequenceBarrier).checkAlert(); + } + }); + + long t0 = System.currentTimeMillis(); + + try + { + waitStrategy.waitFor(6, cursor, dependent, sequenceBarrier); + fail("TimeoutException should have been thrown"); + } + catch (TimeoutException e) + { + } + + long t1 = System.currentTimeMillis(); + + long timeWaiting = t1 - t0; + + assertThat(timeWaiting, greaterThanOrEqualTo(theTimeout)); + } +} From 550faa4a4418d4b4b1afef6267499e95d5487b7b Mon Sep 17 00:00:00 2001 From: Michael Barker Date: Fri, 15 Jul 2016 14:29:46 +1200 Subject: [PATCH 3/6] Upgrade gradle. --- gradle/wrapper/gradle-wrapper.jar | Bin 52271 -> 53637 bytes gradle/wrapper/gradle-wrapper.properties | 2 +- gradlew | 10 +-- .../lmax/disruptor/example/KeyedBatching.java | 42 ++++++++++++ .../example/SequentialThreeConsumers.java | 62 ++++++++++++++++++ 5 files changed, 108 insertions(+), 8 deletions(-) create mode 100644 src/test/java/com/lmax/disruptor/example/KeyedBatching.java create mode 100644 src/test/java/com/lmax/disruptor/example/SequentialThreeConsumers.java diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar index 30d399d8d2bf522ff5de94bf434a7cc43a9a74b5..05ef575b0cd0173fc735f2857ce4bd594ce4f6bd 100644 GIT binary patch delta 12543 zcmZX41ytM3@^)}3?(QzdiWDiOxVyVUad$%T;t<^3p}4!dyGwC*DAJF&?f>4}@695&{w)0Dy%Bym_l68jVB=`_JgmM@13MqlT$|Zhv79 z2KX=OiSdfAshJyWaLCFm$_SWZ zo`U7J@#H%ZE86TAisoc27Xic_$i}Iynwh1`J)CUSY*!i2Ct8n>t2ziAChTK*;;_L;CcdcxG5P?~1N>?4Jx@wlrkvLPj$J=ITk=biAY|E8?l!30b zb)!*gPIpD=HVY9qbsjTSEq5j*=xZmZNoiKo)eq_v=$&6q*k2Q<*c5%Q<;(iS3H;D=iauo$dwJ+j6d2dR=QUPRmbPaPfpVnY`-lpIxa~L zD*ky}EF#(*V=APKbU3YzV2|q%ZUr>=;q=a$ET$!0&r|i?0~H_a|9Qu(}v+6rS&|A@ou+j+xV(X~qX4kN7*T6^CoVR9;=(DEi>yxo{umCd)=BF)9iKtuz)*Q&jGHCH35{KNRGYAC~5|BhxDMD2yWMdBW;1Efq z_*E{ln7oe^t$+gSfCHtNGGa`OK)iVM6ihC~$IlW`H3$mis=oW5Uxg}}F$6g&YUSmC|gYJ&nCMbse3yfp`~ZI`EVPKYn?23>ep!-kgST#2ZIE$K{oEm&kpuxcLB<+`SE< zHDnA{ylwwb8kMh*Tx_Q`$_~$Te)TB;doZjQEO}fq7g&^$Fz5hm4EBf*OC!uQD8IRa zcPnSuaBx-j8Q-}AcP&wLb@GY#!|prGGmpxbrzu5+-+Oc`yM9OO`K2r4(xEzIUR+et zXKeds$`bZGphbRj4p6pp;k`yNK02iY<@&}(`JOCb*=VW`P_}mh_IWcDa}709WplYJ z;r>D4p+PE{F5m_TsI_bE$^uyJIr9fSTi!Pcem`N@c*`MNGW^`4n2w4{Rcy=fk+6ay z`JU@^mp)gVCKX{q#yYky<|}Q03+)Pergakjo{stib%+k`M#=r>hZc>s4~!ogH8w30 zeG$Vg4mmh&nrgC&oTmHoXHz)Y%WN7Q1y}QHr2q2#P?TMMY8i3iFzb}Tx;l|JvYqL zmi1qy5fAJ(r^+<7od$DJts-G}8m-@Od>`w~o(@}+qlkOwf@?7nERPM3JNTWOFMqje zwn5&EcN07ccvbe!FuykBAZ99Z@@Sj=edB_$W@5|DMMdYk%Ed5T#v-%k=!INEIbIax zgJ?$guGlljg84&MkQAHt`%oJ)R54;$N`zHbyph~q$GQ?|%`|F)C4J|7v?k*|L;4|t zf|Luq%C8M%u{>i+&O^DG@;JxnCLR)PoYZ7e=meK6!0zJ3mP$Cq5c`4}=Y!Drw-(aE zNmQajH-V`F0pA2&t_G~BYH6_`&VrG0x|k(pK3}`#1b;*khBRkJSH}&K%3P|lWNY5R z&xf)MvsRB+x@S8_gBC1;Elnf;xiYn>G_>M7IBk5*TKQEMonToXG#*O!$>jYsNQ5Si zxvVYO3Ft#*yQt@kCoZYt3=17n#hYxfO8dqUkuO9(P}%dOC;~~qd+l}>w(a(Vk3hX+ zJ~*=zgRb1ZUw=j)xN&xX86VvIOh>t;71q(xlwvxp+BvCPec z$nM=H;D=K7Dtf#M)0Ru_GWbMbUaLto6Cxwp?}4eybn4B+(R_qqH`*d*rV5_bHI6kk z)U(ib#X^Kk9f9rx5(lWu(@=x1bf~PBu90l+&EZkT{wJuwo-Y_m(<_@%CB>?;6uYD* zb&)kGoCZD2QKs3nX6!tMO>q)9IoixI`Jv+Z)QBaO*qK#5A^2fJGg4Gg1Hq-yGj*`T zMnEj`eU}P-e@;C21@UNcdIDK>b&{ph`5tun)v9B-HCP3_eaCuozC9c#L9(}l&Q3#N zT(K2dld+TNMm;!zS2v3rn$b{STC-`Po-8C1j4e__ zl`S21pOvy^H(bcitSzXq#K#Du)^t=?M*(t>1?EoshvqTMi{~GoKwNGO6Ce5-z(Gu; zSFN(2cYmVc$84VR-*V70q@7275E+rUHulnA#+9^)0^u18#b6LbkAk6b;gTIW{0OKW zoxTRvsv2fx_K>z_uUkGR%Cjt#-TjG_ z4@BCC!upkLhqA|uYf`M`4%~l~b~P)veX&t`pjyj0-}}C85@)lisr!e($i1AW23r&+Y72$xF0i?*+~(JweF;>3kcqpy z&>nbuYV6nLQu4(9(lm5;?j3iwVF zK53XmNcNOHR{6Hv1hURl?zM53y)!yx`m)ZIj>VCjR+XDlsr?GoaAqZ zmQ7B89#5H^?qhb~4;{8}JT+V!A;epJkQ^sQ*{dUisLU8*-q@QTSXHb-FCO2+nj{Yj zkqb;cE$tONxGbM@#uzYjat`PC{BcJVDm=2tGniQQ9xg5eqn~)!`*=<;srqg7#3O}Q zG4Dry#RpjayvylJ4p?BuWQy_C0hqv6Wsv@W*~# zk`EwIoG>nN?o;xP)R1`fyByl`$TX5uG z{&1y3s}tmNLxZZ+vSFp_=Y*aHqlM(GnBIJPfxK5;;Oza}))a5qd;3P$*xiE4< z*hw4W|2~iT5XS_0 zRL;=5I3ObNl?^12hfS2*@VVlLmAkdX7$F*e3%#|QY##rlDEv>Ao%M?wy7qAwvifva z&5sW$2hmZYqxELTFgszUR(Cu12s>4=MS0TFm#M%IRY)2FyCDJ@po0100FUnhIn@q9 zbwPX+QE4s6ywCAK?NsQ)Wca5|*vgY`L6z1lgLgh|i^BoH)r^KqTr{Z}s;Uz9_4x(T4 z6EK^`&PAqYv>FugR+d{&KdbPP_xo>Reu!lSg-=?)5f#!6N{c^n z7@;{_7H#fV&LHC){GQKht^9TSjJfOn>R?OVh0pZ%h@! z*MiLyBVsy;`$<1WvO9P^h+JImF&Vpq!1ql%^kk}%K#_0e9t#&-ZO1dJB&uxSW{}m5{Qo_HpDcT6)Go1~LekdqyRjs$tWs<9FG!u~}Ky($UvH zC<#(dfJd2ehM#r|B`F6J7J2XoiSVcL=%$kjjAAfxQ{RJ86luOK;;1+Ixfjh=(KQA( zdO<`XgMlZ=YrofUDJLg@(|W`sgCZk+d!sgG*M7t(U-51pvfIWuL<BClll9oO)?Pwy?3Wm98Ls3S&PWja^z52$K9 z&YcKpTrW~pKYWDR@xw5qC*VxdAtJe94rlttB_-juqkkEvN|+B9WQmj^=z;ezG1e;) zo)4VDjnXF?%|~lV6X!I{UC9!!pC}W$O&RF9O%aPTZU3r8bnTtq=0g;{3kxOI>7Xpk zFRc}1q9FMIdI!b?vS<~s;m4fY9X6CGLw?@&YZFLbQX#sAx}v8qhAu43$mbL7jSP>f zM&OS3v!^sF%f?kg9K>F*FL!D%-q8})%>^ti%|%?UUb-ce3^y#=RQED3ub|_y=I< zlfe4qqgkgBH_a}9fz8c)q>$IZ*-l!3woRXPy-Rzs%)D#FrpPB?w&PqW=fpEJ8X0s6Q6>y&uh5e4dM*UB1u5kL zY3aGIh<}?fa(8J-$IY_q)n;P|(v-_az7_=kr^tM;D+Ce9&x|`27T%JxijGv>oYOGo z-kSPFcj|N&S4wq6q?yfII3o1X61N3GB_c>3>Dh<85)cC*h)ntkDiQ~@OldIShvvd` zcL$%fnAw_MXy|2|$WAghwZ?d)$?eQL$U$TD0*`bd*>ahnBL1@J>IxrQt5Wyq!(n=u z!u&Z+%0gXRY({7jENe!s+=Xn_nmV`1%7$9T_Tt)6?CTJ^+K0$#w9aXTGgJ)@jA{p~ z0xiVA`3WvA13EI6U^U)1*%?7V9818ZO`Ty!(1$o^67LTt>EVSfLn_rQB_`y2#(M;A z=udl#HnPihF(or31m9_H(0M864x!E8S9r!l-Yq_wu@KcVDvKt#@eN;wq9EQyt0>2| z7tUc{-%N*i<*Gn?$xaUOE;aEGi+R`E4{%mn`PK`REpMjq6vnpsR=m&wA=~`M0@RWK zOi)W_^?r_Y${kIjmEPR8-XkKb2=~|TTXsTi)uE22ys!O|U747U_zL#KuRZ5EPVUY< z(sgX7Xxky)Lhb)V3FbB>c0)!~u+tRYEC%CtA*@H3aXb+qN z`|NnRjQ5?h;q<9_AAy1kXdE@yr6wJKeLY>!GwFJ*=)hmb^U(sjD9^hv`FmqOskJjIWt_Df(0lhu?41~NGx4hZs zYRJ1hQ1<3Y+iV1+&P027gK1o0Z@qD9h`JxqE3WmQ5Wyq*0X;5&mi28(&t*j0TN1o> zS*EVTdHqMXuR75EeD6Yaz%ZUS;xknb*j*drou`oZgU+jr;^q%T_jxSapBDmegVe`? zu!SLe1rzw!iug;yzDV{)TU#*smG$$#%$&p+07Ohnstu} z2WEWt0FKC*1Kg6w{h|~q8e6&|fR5M|*w2w|SopM5ebLjrBv?6#R_mMz5d6e-@skYU8i=OW8m8w8Mdg7>d(2Mp zlwgiJgOL^QqYY=$D&6kCfQ~{GGr_0kK=lldgc6-#I$E3RZIwbVYyCLgye7OsAyDBG z#ckAn#D0lpswOi%N2U^wCgCPeH< zFqCTvRn~oc5}@QpI`_wI!i@sop-PB;8|?h%+-aM3sONqpfgD1|_j&^N>bGn@TCV8n zoShP!;R|HzUHi-L;M`%d9(ecF;26iV?xI-MVe;1f=7?!zegY(bAFZFs&Aj}%kB9{h z$dyolV^mpg3!m3>*1PI>Dn?(BN+^5JMwjfEyrR~)Zh^kV9oA@vImiOD5V*HK-Nd9G zRP}ZGPqG^u-{&+Zs8MK>j)kQs732dnvaOQttdr%`jaAY3U?P6v>g@|-6o_Ei0RRUdnal&>pDmWC;DusG-!<3!-0L2!Pm?P66^sle z^n91maH8VMt-R;YJlgMVQ`CIo#2HZ6N|6Eo(EMbpdji?WR1!Bn3Geul^3oq3akqhU z@Wk}2yEKBYM<~wjorrr=u@^;8SzvZyO8X!%xF|jPTv1(}@_2C8?c#9$e(d|<7_SQl z@!43j{R&(uvqO3c%R<^MT$q3x1Mk&BDEG%pYogUo;lekzrB!>AKOz3Vn>PujH&DMW zkHzNdDW+d9uE}u#0GdCS$1otV4kTcls+%5xGVV|5szGaxX@BdOI$Bx=N~@_+;o;SI za&bkxfmzF` zMx{8am#q+i$0>KMcdcjlUi}`A-yM+o;q_Y4F1H!IS5P}N{b)MVyjL)VaFKw}SsT{s zjs21JP`2NQZIHJJT}@Fi8<>J62(Gua1(de+px&=Dc6%6cQFMFkTt6DQwpVzP_#hA6 z307U|CwWo$;6AJ|b&EpWCwdY`opiGfLo?H_!x+2L_yq0nPTaITDUmY=yqOew+c4r zko$G>1)%ZqKyCR>=d+ZhsC<&8SKCcYuS6S+$XekZ{id<8lq@#&LKnoQ?+D9?bUHnP z;YB#2HjPV9JtUIl;I09b?X9&-spK`uy``YSC_Q2u`5FPI7)pH;;gv5!Rc%;sbyDZX znr}iGhJieL%p!&7m_$@vM8oK+Q>)EDzZz zzt71*C_<7qJTZPHD+w-ZRrt$jZ6IluQ+Y=gc)b`iuLFC6;TXKEW5r*POoBb&VLp)z@|KY_P-*rCgPeWcCf^HSaySW?uD zF)0cNdkUy2PfctA1=EGBG%%8VHA$ikP!E@sWYEX2^AmH%E~)svI}2s^_t9acQbKV%O*D}umB(_>5#z|ne(3WR5`mB>KYgKt=36>=GFcmeaNe`@jGZHJjXZP;r*0y!! z0#%_Yx|dgA0&AK$0R_CL0-ZrM|tB~x?bFB?B2q|9;Dk}@eCo?!NlEpN*vA}mFj7f5e;829ynCMSyr@8xat&w*uI z$e)TFFO>M~!tU>)k2zQe8^YtStIpfVI+FWh5iOfLZfr2)?|%@ZKI!rnY`KkLbm-NG z?A4y;;ynF$@U+vc2??us%QUiAE=7i$Fm-D>@|~GpT^S?KKWSASkf``Y0-x{l?b*a= z&B-L|*=L}Of+6Oyc(9%>PnLX~0xuh~h_{XE`MuqA$QeXvI3;1k-rMks^lUnN_C^uT z+Q1a}a7HZ$Mk{!WBG;EO-&*OD&1iAHvYl>tUo&x+iCi7EBF`Z;A zUAsaV9W{+4rXv!Leskjlfd#8)qj!gVO8T6kfpsc0#%QYBv6>&XJBY;HNAC14S6})R zn&7%L-Tl~Q#xrzBSaFHLuu=QiJY^+L^%9Bxed=(l>-f!@f=ndMTH8^`DvpwF+S``G z1P?XX5U|JaAgma)SS@HH)-O_d*sngZ!daf!||P}4eH zi+29aJV>bTU)VWV&G8=XW&7YmzV=Z$6>U@66JJdxRHY76+|6macFylCAnv=#lw3&nCs%cuP6>| z9}xT|EN1OICTQQdth;u0n=t>OUAfRF&@MeV8t>zKSupy&%&^fcBL+0Ov;Z3Up)Sv+8EnMHphtJRK*3uqo^b^ePul|Y96rm(OIlPGFxnwkt_oSjBCEvQff-< zTA;96?J{vc979<(Kp~voA)ztS!>>Cw=kfkk)U?SIh|&E$kGT)8ADrolA54z zO6*AApNug`9^ITLYzYg`iQ3x2+zZ7i8m56S)L5ONz90+paxmK)i@us`Qr*IKPG!-8 z`n?4;kA=R0vjkR2TCZYSuSkkrcy-;Jx<%D|GIExF&UhgCR>vfn?aZUe0Q#7gK+144o#k-#T3MUL2Nj=~T=5CUDLDYy;oo+qw-Y+;|9%qjYya;A!Y^X_AQQmIzi`C~#KJMOT)GPrQfV2~AOgSZNr*NnzC0=dvtD zW0_m@;{8OXnh(js<1z}hW$Kn*u-I{Wd!$OY%cqvhwCT0MiZ$Q-<4x>oE8_#8vAp3R zX1h^tsvEW$Y(R#AJFN*iTGBSX>#)y9rrF26A1=>~oy5+eZt=AcB+fq{*QX4C;|V;7 zZe(9b7?5sivIniPfP2~jfp1P){oC6FVUY&v!`(DLbHT|#DeJNm?^OrZXFuqKcK92eP3Rs?K0Ao%-?ph@>2Ch7tXk&sO+UnCho$Y zXQQcjBFSRqitzM-^}Vj^J>B?b)yk%YU0SGkjEH0cTrKw!49AZppvGa+t=}EC(K>t2 zrAGX@QzLk+eIipcijMF#riU{hxq`vCj^F&HWt7Rfn4IT(0L9A_=N5OxB4h8zTw(n) zuDpq$BR)#30ds^6MCVfBDe3auvRI`Fi z7BM+2uV)`4GF#R-fz$4g#${|Jr$WO153N=r>J#MG~{K+~63E5t^+6%4D{V}y`nn{8_>7u9)}bKcX0;};oYmcL|+(ciPtfI$4Y8~MDO z;|Z1@>%tZzGWf?Nj<;{WfveV5*w$B?^sp`3gLIUOnh_wCNX@v&xfUcfxvNf=*H8tI z6{p2QjtX#7W|#w0#fU15mfWh*9QTg4=BxA=-4BTP0$X*f@9Kt>>-K37>3p1nEYH_p zy!=vYBu1*ruQKD9+yWP99gVe^GR8ba#yF-$`Eul#Td&0?Zs$|%y|Y z@}crBgh~@XRtc)xSPkFr%9-1bT;JjwDg?QT4B1bdGvKPmC6_9I!T-qIX7;th2&H15 zXQ)gZ3)iX_F=bHls4A&!V*yq%t3J!ggsd1i(Pvye|fO) zf9C+~1-mOWY!;Cb2uDjg!(!cyH3u;X&FSn>b^F3&jXQY-%)U`|ZIRR5X$quW#`?5I zO5;pCoD4N+!QzVzTUS>u;eCdm!d#7JzE~0Gg3$hb@@T9V+R5(q*VW*aRKM&Un<99w z`htG_cjHmL3wQQ*K7&5AQ|GEWO(Xrg#Fqx(u){_Hw|5}6lVzt0A%2Ct=(k`^+mZEPxX^BA6;p|Fk)kdgr_TEDT<{0H|wgkST z?)jT35Z}|g;`3pIry1MKYyx7c`V;nm(!g(|72sP!Ttuu?qasUV#3ozIo8E^q9Pin& zii%q{dJ4~==>m>GqcUZE+XbU^NHY?uUxpD^K4U_p(tmZ>Q=6=nY-^l9?@z)ZPhk(` zHRU|d9o4#oTQYjBL_UwgC5=3g;8TiS9nbr|YgJ&i{-8V9=X=esWpKMR5Was zI>a13w6i6$|Mqds`5+3$0>A1EoJa2^IDB`v$pVC^I_Vn*Urr+~ntkHpM$UOIK&S2J zHT*MhmD%kgtIZ%Qu(IqhuM_lParTMzE7P34M;LxD=@pK)nM__$XN$6)vo5?;`9*%{ zN(*sZwcz4x$ZF{^Ogf|gA zq*sN76%ojTL>~mF%&cq@ni2JuHLcvbl{Iwi93qerPgX(k&}hCV4)cV`1L_>C2is&5 zUvh0pf1-D{xj@aQoYCAv;fwu&!)VOF6SR=UX7)zJ9d~$}o@PkREie-Ev!HQla$oX| zaZf>IGCbwbMz?Ww!;>R3+Y27q60KR*3E_02+SE@>hnJwHVKqLo52kj;v&`O@{EB&T z(V5D)sxa&Aq)n&Q`>4f@t#!(QA|{| zUm7~_6Whx)C~G)EF&tXFDYDYaD?Sx-hs?&dip2UcLh~$ zdCPu&NxSEC-lV@S;eu9zSx2_8OK2kW`wV_pxIIt^TFBAqjB-!oMbAIp{+5Y;yegg_ z$mPIyYQPl6G?>)P0xR91up_S^y-DP)%vI{VsbgTS-vZP}4n)b`Q*MaVWp_uy!SQU) z@V=AJk!M?J19{)UXnp6_k;hCAR5V`kbcZ7gDgo@S$PpDT5ci1XJ!2s$w}o#lO9Ad3 zqIvM*+P{JQ{asg0F9-dn>xx}L2^Q_2uB%wnPpfhO06_NjO%4F9cT$2nG%!_ZrIpWR z-^w$y$}uz7va-~(vMbs;BO-yp<0RW-V(ggaQ6*q3=VMdmf6qOn+B34pjN6Sf+Rb13 ztLgFV)&A#QKf3W(+jw9A01g1e+=T~Zk8HN?W=0Tpd67W<;(i}3AXx`49FL{_h7U*d z3ngxh<@MFkJGgp}fnmud_K}ws5lP{KRM=xe&j7L7qz1DuE!#9Gd56Y713kVBvjpCr z)~+TIN04&|=+GAYus*jI!brp6=Ak2xpm=~PZ>%#SGnd6TRb=s|iXVo|@}{cRnX$0` zVX!M<%a-zxuOKLsT>WJe8Ghi#lN0D)^K!a1n-C@$7^Fi$C86Z&m-ijNF`pPtZsm$i zjXv%tL^9i>?REuUe5w>Bh1>TG8464GrkN z{D!KB4EFy>b;dDZzn@B(62bnKtZj%OeoImI+<&Aorzxo4SgyyzAB`-p8JORgTfoF0 zOggCekLAa&1!%vY=BJ+j!ICr9|43fhwUED`auk67nS;0Z?9WrCGOa)MBFjJi;Z0YK z|H1NW6#wv^>tZ0_{t35_pZ2>5D6xrO;9t4Jq4e+E`8#94z%d~H+f56?ZD#vVJ1t15 znfrH7Y)tdp-$G9_`)}d7neVsop+)+)5ZPk=TL5d7`Yp(}s{R&ATeT7YYM%vRwaJkD zgUEXkn(sgY0JyI%Fu&&N|GjFyJfMIk+c7~&ZM46Ld2Y;2DzBZl04M-}`2Se1W6c)^ ztY1V1ECdEQ$^S<4kM%M8qYz9O0Kggv0HFFysU7E+QacX%zm)#l3<&^`{H26L@=NKb zI1DJG9qpG}s~u8bdtUvIgL`cV{)=qK_DbgYMf&UF_b-wU*Ds4QNdMde0P*&_Vu-_}}sv03iE|2$K0lJnO*!z5Ga>c;F7ozcBtzmS4&-h7$O=umFH0 zG5|pOm$H}YFJeavEJ#lQ`d_Q__o4M)&6+Q>y01qV3(3Dvp8qA8=)ImpRKIc`q|t@P zZ=nwW*w~vgncC|cSsF7LTADNc?Fr<-fLGcRpDkq^rgDmy7=> zu<9B|=D!m6U*9x;DNq`LWc$fMh`KN!-fo27UPyK0fukD#!b7{Mf4SdMD$V*&m$%>R zjC_9?KsWnEl%{|Kboqrx_R{^vT3<12k6(80oe)5_gMaLz_Thn_`24X!{>x!aSL?*s z*F5h>{r~g)&=>U0iw4x(_eXj7RkkJYq57Axj&giZ>d>FHD;>fECujg| zR`LAH5F@6UFU+gA&96rR>tBY>8bM6MAAhZ*NFD4Q-|K8cuV)3%UnG&1*EJLWpR;^( X7!!P@9hB8X3c?*hf!6K#hyMQnF6IBZ delta 11158 zcmZvC1yo#3()Qr)?iSoFxVr`!BtUR?2re0%U_mChh2ZWI++Bwd+&#DjNC^DAklp>V z`_DP|blsY#o~r7u+tbsxLvvv&Q((~5lwe?y0RTiq0IABJY&<$G;y-OSbi-1-kS<>8 zx$BuL6yU!=MU=2l;8FVL_L}{v1v5ddJqfkw(5}$Wsu0+nXO$yd?z5^E4-p$4>mPfw z%D`r0XaE2c9spnk$HU@-RnbX635$-?;+SDpV189HzVRGZ)?M+O5BlHXn*i!sA77H@ ziK|QMN2CucL?1BC#UBOqY6eP&^ul3uN>N$Ps;0A$M4(KJj*p97cpt8U-rYk72nwr- z2kN0m3#0^Jk=m<)V$)P?m2Df$i$tC$h1flkf>o6`xpOF`EcQ1BPv=(e>xyhAVR>O>#ioIAN)L6{|i z$L|wo8LQ6yUChVwo%fy$R3FFAv_H-JvA(-L(#XMN$zhefW{=!Pa)yK^XFd z8YPF=3h(c)ImpDgKn4xX&vY0}f%Ok}Jp9|HI0%{CEhG|n4@z(kAY&wrIwDy{!4z-S zv%cQslnI!eL2H6jw`lL6`?NEd_v)%Klnx)cY2tJlx$9;$ZG-& z0KkC?%t6TjLh{3%^?ndN;w+t^G?tK$62OlWZ|rqIA<`$X#8EH~=|q7ShJ48DD$Tq) zETc7{_rS(pblG^pBjUV3=e`M{b0op0NgYhNbe+Cb2Cq0ye~VZwPyw}_gxz(dF3%|H zp`OQPulQc9T(|%7T0O5n0p0RaFN_le;TH0l-@GLP-Ajf!C7RwS5#1|_J_yR2_-N)h z`M&SCO{7eOcy1lC65Z3ugOor7c_)xngngu+2t@6LlVb3U(hn~_J(Nv$e4OflbTD3? zK>frBz2}y9zci94$$Lop{eU$2Fnk9>{tb$I2}zeI`E)Yk3&Ks{%yGe4L7y30(p3oO2 zd0&CD(*??Ul(RNJxq#XA;MoyO4KJxULGRq|cuMz+et;A5=?=gN$(48Z2rTPJXn$cMGZz?g~*JNP>>Y18L1M^qQfSPo5t3WxwZeXp8h# z1Leu4$E`ia`Esj~=t;a*OZPQbXp1kp8|n{ok!dnAexDZ$0$w~mMGvFvA94{=CEKj( zOn*cXt{g>gi`L1-ns!BQ;zp|%cqjmm7?e`laQ2@ zK>2Q^`LhDOW8zC0@^s3q<8?WN#&trzY(R}Pk#F&;2b)qzU9Df{LVH3vn(DN(Nmb1u z{kcD(XmUJJF%I_KvXj1&uINRY?0wmX`*hJrqR_K4;nw`aVavx?!EwxEnUl^bnP@gO z>6%!fO^jPKPhXCgG!kyqgw;j47WH=A5#)2kNi$Yg!C+CXLJf5s+>=x>@6#CHkb2C- z)aBwCCLtF+6`emHmb#>~OT5gXFHLV6x!(P@pm|XiE@FeyiAaj*mb?yId7JHLo?{~_ zc?7h%g{1T$TYzR0&SD!%W9nCHE+k?kwBcZAk7bQ7Gf{B?pQwf1Nwl+)1=O{|lR-GL zitRa)P2pJyb~d3cDcUjLJ*aAE3Xql?zKzS(@>}B<*VU-8F8lb*acNY%UuiOtsKYx( z%b?fufr=8c+iB?rGVP+r!&Mz!6dxrIDm78VyfuevbdO^6mLuJ7OS91G385!&JyF?H zAa(v&+}rutt&>p2)}tRnz-&hKlNvOtmG9nSr9m-1M?GP(Ow0$dnG(n_rfM*g#75A% z{XIT9_}+2VGhPwFA0Gk-Qh9iI%&U>yk1vDx%T*8AY3&lH6%jl4( z2u+S#Q-HwK#a@@>rP|7?AKPQ=OYi9kER}PIqnzANoyUVH5pg$#y^An-nZA78nbe$50 z)VlF6lz{sZTIksZ{Zh9CV7i@HzR!pDpYAIxrVTIlOz~DUP=@$yNt+_~%UjsTP1d`( zY?3BO!Tpk4pxTc)NT|f^vXckJ>EIh5D_Hhl`8leOZ*SDOa$c2{vQfO$)SNPh`n@BN ziu~gmZ+G~raZA{YewXveD-AU0BH5WGy>tFTxN0&~MoE=|7F^C{ zpKLNfT6`o~;#Dva&y?130Vf!49ZqR$B>1@-I;*Gt;#-qrYMg762T}#7Ab(Ae)6=`r zV5QMq3fIRMeI|?cf+`tdG@7@pg}l^qJq=DdKkBkoQO8;@KfBOKZQtzDC6T-!L=iqy zYv0C4HearZgeUnjBwk5`DfOd5`H6A@C$ zyF`pLlT$;6wC?LtzYtMSEKqQ_QI1Ptk#P2cF?K)a7wJ-bcQe7fHh2X;BHc$Yf#(R( zkf_1(1&y97vT>5AeR={^8jT2;(QohUy@RZ595T(%yqEU$1=Ncd&YEKksc$ba&@%fP zhGG1GgcQJALJBKEcTKvQx@k%i+5S}`1SYXg&aD}NlcOpY2So#r?l#V)cjBw_V5aPZ z85_}D$gZ?q7ZpvaR%_KyOpX09CWtWB^49=ZVd0mKpKZN<9Iuqu^h*U<=x+1(@X#>D zMy@5IoNRN-!n=_`wj!Ng@ap)}&=kzzt9wVIB~Czdij7Jk(*wz^+{o=B359hdh{i!hixkM!-188I{$jLj7Ko@RbC%tf z^Ojs!NG!a`7>+&L=Vp?2HKKZbBC zB~Z^^Qp+Spi|?byQK4_0Ds_SEtYvb8PadJ3g+=18n{V{lNc-k>qFDDbG6=Z6dt=4~ z$`A|W=&Oy zSc-gAz^$|o`m~~MgssMJZndTOl(k*HOJk6J(j!^46PNzTV3Z>Njbwm!tg` zcRoZmoq%dKRvS9Bo4_Xc>F*1lT`P)N&(%C7q39Y&_mNM=t{9jJ92D^(s3gVO_GohP z9=XLSM>fAVF7nbdH&4g^;BDBg=$0gK_pBT>Qiv{R~WSrWOMljnLZjzX!(-~AvsLxHl z+JvOYo@q&%;UIn?&WGqSG$phP zN?Cw(|0;-VN&1Qa()A83h)N>$T}|_IybELW$)S(WabWZ2FL2{ zx%gmke4N(yVQ-_zC@(wv^^aB}H$p#4{4&k$Kb zg;0BrnCGOqY(AgFSATmK|0HXlf9+4Wq8^lvu>ble;;H|~y!?&B@wan~@dfO5ft6<3 z9G;_){y9{cpT+?y1UB>qW4~1@kIsC4+%Wm!tq?Fkfa{A>I~czTZAN4?NP6x|fo~)S zCVt&OVw`x#IXk)}ZYY&~kw>$`OvcriK_b$gCjJ;py0H9+DSCPe*?`4&4DEqzlNjso z@EmeqK`^9Rg(xp$jcE&x22mE`@pu6C)P@R%!k)VVhj&GrgkoW0b3F~5iNWh_+1MmxiHFYPN4^iVP1q0!<)AE+t`w(;lZ&&D#cWbq0#eE`&vzw4CT za3yTUh#HA&3o^{h&R4a%$9A`p0&VkXnUV(oK(jtH(FXOkeu(^EtFaU zWfO-|xS71cbxW4J^!aSS7CTH-ZZwc*ulZMX6p%52XL!J6kV^%lQwh|;sl?fI6EW9X zOC%^Dh0$<(&|^3+i1%Y+(#GYQJJ~w*;Kr(Z_)3XSlu?=@VJx5)8KJ@`OuA5wlf~Ie zf|+XsG%C8CvP)#;QSS~=2wszwXc|LqQXmW~3bW(sW;_lPT0+sx42mC@RHEZCH+e_; z)4O|W0MFQmNHBApJ_lrL{*}lFu302>!6UK#(0m2OTf{IRswMRbx~9Ep8M)~vuMb)p zMz=$Sl>Ule;$E-7!o0nsKfdnz<5zCF9w?8i{i2(=ok0mCJuGr*2u=W;2Iji5;0q353p0y16 z93!bfN03WM3CbFE@Aj(H3-F znF3kQwazJw|Jq8b1#zRmJ#7Yc$xseUJ^ll)#|Hoy{%i&zfV2A0L5aGa#w0UD51BM_ zHLfo2?XA`W5TS%>EVz~AC9Q>_3k-EtHNqSAUpLPYm}l~5a_!0MTkEnqC=g7-k?lKL zN1!!{D=kh!+OFA_eJ53J{XMPr^9&%T;j1mzD}10;uUGF5hY-l2ohz`BH@|Th;&qkg z7IoQmH?cBWYga*b<28I+9N7%1@_10Hflgy8=Lqr|Q*fkwq*6`G<X=~&6lTH z3@$9<&I#M$vgEpFb*?jI>^O0_hPA18(FQhX)A*BOG%%?#lnr?@p; ze-%<6O#@jnwMhMdcR-%0IGt8*)nJ2aKn9f#vDa@@R27i7&sr=9YF2)&>;LU5S5LdN zg8(BBT;uzAv;6%9paPEfPuEd(PhWq+Rm-pRZi}%XgswGJ8GI&dJAXk&l5b>#5zAWT zI%N9S5(z)^FwAQMt=<&aOpCILeTtAvqh*K)rW#A5q(*}hv<_L11Wx)cX?$m`;@uZ1s!U&iPk!);V|04+^A}6$gURS)xK4!_h=O2 zU>}JjdX7c*(kSemq{uKo4KZPA2qcn->!5? zvyTuHCnw?eV*u|k#jr!gEQ>SEhfC)^lmeqcrcsgQa|IjogqkL~Mctijcw$|zidi<9 zYuL&^mEI7M>zuhR6>aQ(t_U*Kt)5cm8I_8aCFe=9s}M-c`EpRLuE5`DOVT-y8P zZ@mb zv+`M8Sd)u|8TA(-SQMFGQGEPjwF}BxpqpxI}%!ens%Pa$Sl4p?JgS#*^?t{miJi%zqBAAIx+=AkGk3@|EEN z*PfVXQ+4)}zz_2~TmS6RhPc^jQ^zoPSj%xz50(ne-wJUHatzl$AyqkU6lk{|6CmOH zQp<|<>Z1v}M3;8WJ~*o{ZJ(3gXlOL)lm|t@_!K{kJ4cB%rM`r_aw;(b7PY@5Xw(+P z4Jpa_?wYt^QD}z-oKZuhuE3|Vc*j!SLPC4wL36?-i*g_8yAhA)7u6fVo;Ia0vZUz~ zIis%3BvPy-!}ihlLu_%z(T`sck`mAzmm^br;nJuAI1sA1UWK|B(PmufmBe|Tqe7*H znse{V-OsJ-HgMM|joN0DwVGW5?~0efX^D_Z#_wP13{ABA4&J$FoeQ$#?pOQKHo~`e zWS}DPQ;P4GF`DSR+$E#*q|j9#30r28ip?V|aGD@D?*zt)ul5aMZ@!Hg6#5-#y&tj|IZ0V+93N$#P4&ik zYbc*N(e-xdOt=D`Guxmu@->55Z4r@D@6H${Pt-y+(Wq0p#7z~AC&Iw(7L?wp-v=cF zHtP@Qx7e&@(9y}}B#1)5pg0vDv!D=H9nd4R!59Q#*4fNe-wEQ{$&}<^|6!qkBYN z&_-x%`XNPL+>G%N$srkrf&5$>(wgdi;FR&kntlOOUVktrD=4hGT8y; zYS=SPoE4I}hH{7?j0&=K7C~06WXQ!zY-|eee`iXyt}hU^!U^5#{VC>$VSmR629+Sg z+XMl9sod{V(%p3Z^~Ja$c(6D?tpS!2D?hVnXPjNj5*(OK^smfH^(~MgF&m;79#4$? z(bW26ke{Eo!(;L`jrF zlB^PUM19Lbr7nSD1FO}7*g;mwp)Y6kQf!}>LNOqpbZ3~Y|Kj(DBtk~R@4d8P$?JmW zvNk`iU}=HS;u{m$C|NKEnGo3Sh?k)%`3s>3et7t7jHsLN^j~>ZK`wAe{n*Af)|~jx z1IFhJw&3tZfd&f-RSpE-;qGPUmCn9}L5&2+NGmxzxi3 zj1#Gl(iMiIOe|x)Tp&@J!XigKo22C=uc~HfcRi7Yx#{XMVtKR; zcFjyxl&%Iu+TCeHJMOjOL z6i9qxGSxeCUeYjaCMK``C@opvohb{^cFVldV9uo^rkomYG$Dl^>k&JrgcWN58{<+p@^ylfOai3Dbh}_cmZZ|(H;*fvSbZR1`QH9 ztWXrOj;2nKD=J%l#oO#nwC06YB9&3hAV03tQmR&onhD1}2(cCTeu1VHL%TSJCB>JR z62LD#XwQwM983wtu+1x&~K9K6`t$O!@?mB+;>GuUJO1y{8Bo>i@xG0Ne1!YCw z?G49w8nb4KLdax+eBd;!)$)a4%h6yf3*qp$T$vs2T;C{PmoW_+tqyT0@eH6|Z)Xh$>H;u?WmtBWKN9$G6aS+CUMFu!;vb zkrpFf>GLORZz7XQaRkaOU!8cLwnwWQd_02^yxF>qhRth)V40jK!cS#cwW?=4Y`FqeWMbVrxEoUYIa#wl zYl<7DTai-h1Ylg;HQAnM5hhJjk?VOfO4*u#1Vgu(L;XqIoFxNOH&XHEwJD`49Ryhd zX97(~J^eD?X0wsFyG}ruEQb;)nfwX;V43VvM7PMlyb>+$!#t zJL(sU<2a$WR#mqz>5ad$UG>uymO|fR$i0&#N={DuKnzsnf}of7?b{RjWag? zvCztQ_@ZB+XG5s#i)r&$EX$m zEqXCW-C(2W)I@i5f3kPeyb0P`FyM{cto>;uyx+n6u|MJ64}rk^*L~*}FX)m!_Pat9 zH__z`yILpRfz6n%8KC9)Y$@)*5;p8|OzQ-uyaQdvz4*n^S}o{~u4SMeW^S4(@FN7C zlwqC{FCV9=^Z_E)fp5KfM9JiGIie$^q?031%5QC3kRS2G1tY&$VOTnj&Y3;`y!{^8 zhDFnv&5$Aq+d({F!5D{EG9=PHMbZ1CqE{9_!DPq?a5Nn_3|c*-bTnbUmn>x%c&Q%x zvKslwltQpq{Sfn|MSHE2O^ zAU4DU*Q$zS^b4Sz)|mvW$k@(X5ryQLOspD)NMF<6`oHJLcJa8?M>6ZR;j?v&x9Niz z69-nm_NnJVF=m9IO)V4sW;+~;_N`{8IgrJmg*lDaX65PtLH*Y+NuU<|pJM}2IzU7u zlE-5MGtsahb?}~%)T>u2>ZeR&>Ks*S994B(oK;-BTFxG5=;|C?96Z}h48s$ODr}4m zD(s5NLkcr0<3n3aOk>PLqbl{=(En%a1nH3zy1&g`>InQu#K=6vdvUD~G2SXLTqJ?0JbSt{4n`nf5bYbY_A ztxev$Zhy+J8Pi@-_(pl6OcatIrS%Xa6?)q~Se4as3i%SIcS+>;~Hess_pMFfw++_{_YzZDP)hhyJm)B9_=cW2#b>R>#OnUPk_3C3FvI?nZEcZBI5gHj z5|@!hYykiO*n7Mfz}3+&z~<(S&ln1ed+2A0%6jjQgl3lj^<*bTVEr(F004yJ001IS zc6h_2wWsz|2+t;cZsah}(ypiAABol%0{5)@6>tmnEZqcs{bMm1Ir&GI7hM94{!ehB z`E>+V*Z@F36PVqX5{y{~UmFzbiT!N(JLmLIP-l6of26O4^{~(1#*{(BJO$d&S@kyM z@mN&J<09la(12lhFv$dd?ONsMKW1Sy27g@d)QbNhU}~E9qx=0u^A8T#ngE0JPhk29 zv-)!2lMXS-|Ea(_wf|2AZvRzxP|&zA|2ZTEcIxE*_bD-ONvGiRQLnvDwr7F3i}zX3 z>=JnvK6a@*3)@|e&w@_3^0QFbt@A8E_q;*->r@!nzDM=>tM(pZ=s`>{R5u>@q=)fY zN7_pa{R{6&r_xJE`H$te>p>*m;|M#V0|0c7)$y-SUEl}8C*Ua|4!FD*^$Gl+lRN+b z<-d@RZC&6l#S{3rm+09ON*^(_F#D5ExR371_^-{%zl@c5o`9<8|Lk27KNGY*5_}bX z(jWEFJQ4h7kqH1$|3xsa@W(sCdC+`1K5=Ro_-z;Up-zle-VK6z-LCZV7&p@0FMrkJ>MTa-b)#P*~f{%6DGu9;X%}Az49P2^td5d!kmdkP3b=+^H2F{ zNFtFHJ=XN~V@(VH6+yGnqX7+A&4L7c_b(#UA!6uOQ*cftBUslOAAB^5O8F1T9u3;L z++#l|JrZ&LMN(w(NJ0v(9HMyw{`IK;1r)UcSKEt$+eZF`EHO+BZS3^q>CO%F-=4Ni zXXw`-y*NHrlh|JbL9S2X5_*yUL-OQ}+Gj{F0S*92Mf>lv>vVq#>y zUz6QmhHNE|-u~}UO%Ov&- +cd "`dirname \"$PRG\"`/" >/dev/null APP_HOME="`pwd -P`" -cd "$SAVED" >&- +cd "$SAVED" >/dev/null CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar @@ -114,6 +109,7 @@ fi if $cygwin ; then APP_HOME=`cygpath --path --mixed "$APP_HOME"` CLASSPATH=`cygpath --path --mixed "$CLASSPATH"` + JAVACMD=`cygpath --unix "$JAVACMD"` # We build the pattern for arguments to be converted via cygpath ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null` diff --git a/src/test/java/com/lmax/disruptor/example/KeyedBatching.java b/src/test/java/com/lmax/disruptor/example/KeyedBatching.java new file mode 100644 index 000000000..c64279015 --- /dev/null +++ b/src/test/java/com/lmax/disruptor/example/KeyedBatching.java @@ -0,0 +1,42 @@ +package com.lmax.disruptor.example; + +import com.lmax.disruptor.EventHandler; + +import java.util.ArrayList; +import java.util.List; + +public class KeyedBatching implements EventHandler +{ + private static final int MAX_BATCH_SIZE = 100; + private long key = 0; + private List batch = new ArrayList(); + + @Override + public void onEvent(KeyedEvent event, long sequence, boolean endOfBatch) throws Exception + { + if (!batch.isEmpty() && event.key != key) + { + processBatch(batch); + } + + batch.add(event.data); + key = event.key; + + if (endOfBatch || batch.size() >= MAX_BATCH_SIZE) + { + processBatch(batch); + } + } + + private void processBatch(List batch) + { + // do work. + batch.clear(); + } + + public static class KeyedEvent + { + long key; + Object data; + } +} diff --git a/src/test/java/com/lmax/disruptor/example/SequentialThreeConsumers.java b/src/test/java/com/lmax/disruptor/example/SequentialThreeConsumers.java new file mode 100644 index 000000000..fde3d4c6d --- /dev/null +++ b/src/test/java/com/lmax/disruptor/example/SequentialThreeConsumers.java @@ -0,0 +1,62 @@ +package com.lmax.disruptor.example; + +import com.lmax.disruptor.EventFactory; +import com.lmax.disruptor.EventHandler; +import com.lmax.disruptor.dsl.Disruptor; +import com.lmax.disruptor.util.DaemonThreadFactory; + +public class SequentialThreeConsumers +{ + private static class MyEvent + { + private Object a; + private Object b; + private Object c; + private Object d; + } + + private static EventFactory FACTORY = new EventFactory() + { + @Override + public MyEvent newInstance() + { + return new MyEvent(); + } + }; + + private static EventHandler HANDLER_1 = new EventHandler() + { + @Override + public void onEvent(MyEvent event, long sequence, boolean endOfBatch) throws Exception + { + event.b = event.a; + } + }; + + private static EventHandler HANDLER_2 = new EventHandler() + { + @Override + public void onEvent(MyEvent event, long sequence, boolean endOfBatch) throws Exception + { + event.c = event.b; + } + }; + + private static EventHandler HANDLER_3 = new EventHandler() + { + @Override + public void onEvent(MyEvent event, long sequence, boolean endOfBatch) throws Exception + { + event.d = event.c; + } + }; + + public static void main(String[] args) + { + Disruptor disruptor = new Disruptor(FACTORY, 1024, DaemonThreadFactory.INSTANCE); + + disruptor.handleEventsWith(HANDLER_1).then(HANDLER_2).then(HANDLER_3); + + disruptor.start(); + } +} From 1d44ba88e728494a53546b2b097bb0fcdfa6971c Mon Sep 17 00:00:00 2001 From: Michael Barker Date: Fri, 15 Jul 2016 14:31:00 +1200 Subject: [PATCH 4/6] Add toString debug information to Disruptor to show thread state. --- .../com/lmax/disruptor/AbstractSequencer.java | 11 ++++++ .../lmax/disruptor/BlockingWaitStrategy.java | 12 +++++++ .../java/com/lmax/disruptor/RingBuffer.java | 9 +++++ .../com/lmax/disruptor/dsl/BasicExecutor.java | 36 +++++++++++++++++++ .../com/lmax/disruptor/dsl/Disruptor.java | 10 ++++++ 5 files changed, 78 insertions(+) diff --git a/src/main/java/com/lmax/disruptor/AbstractSequencer.java b/src/main/java/com/lmax/disruptor/AbstractSequencer.java index 452ed3193..50e057063 100644 --- a/src/main/java/com/lmax/disruptor/AbstractSequencer.java +++ b/src/main/java/com/lmax/disruptor/AbstractSequencer.java @@ -15,6 +15,7 @@ */ package com.lmax.disruptor; +import java.util.Arrays; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import com.lmax.disruptor.util.Util; @@ -122,4 +123,14 @@ public EventPoller newPoller(DataProvider dataProvider, Sequence... ga { return EventPoller.newInstance(dataProvider, this, new Sequence(), cursor, gatingSequences); } + + @Override + public String toString() + { + return "AbstractSequencer{" + + "waitStrategy=" + waitStrategy + + ", cursor=" + cursor + + ", gatingSequences=" + Arrays.toString(gatingSequences) + + '}'; + } } \ No newline at end of file diff --git a/src/main/java/com/lmax/disruptor/BlockingWaitStrategy.java b/src/main/java/com/lmax/disruptor/BlockingWaitStrategy.java index 6302953a6..52e2b3748 100644 --- a/src/main/java/com/lmax/disruptor/BlockingWaitStrategy.java +++ b/src/main/java/com/lmax/disruptor/BlockingWaitStrategy.java @@ -15,6 +15,10 @@ */ package com.lmax.disruptor; +import com.lmax.disruptor.util.Util; +import sun.misc.Unsafe; + +import java.lang.reflect.Field; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -72,4 +76,12 @@ public void signalAllWhenBlocking() lock.unlock(); } } + + @Override + public String toString() + { + return "BlockingWaitStrategy{" + + "processorNotifyCondition=" + processorNotifyCondition + + '}'; + } } diff --git a/src/main/java/com/lmax/disruptor/RingBuffer.java b/src/main/java/com/lmax/disruptor/RingBuffer.java index 86d946e9a..a452429a9 100644 --- a/src/main/java/com/lmax/disruptor/RingBuffer.java +++ b/src/main/java/com/lmax/disruptor/RingBuffer.java @@ -1104,4 +1104,13 @@ private void translateAndPublishBatch( sequencer.publish(initialSequence, finalSequence); } } + + @Override + public String toString() + { + return "RingBuffer{" + + "bufferSize=" + bufferSize + + ", sequencer=" + sequencer + + "}"; + } } diff --git a/src/main/java/com/lmax/disruptor/dsl/BasicExecutor.java b/src/main/java/com/lmax/disruptor/dsl/BasicExecutor.java index 2c6ea9725..9c81ba7ed 100644 --- a/src/main/java/com/lmax/disruptor/dsl/BasicExecutor.java +++ b/src/main/java/com/lmax/disruptor/dsl/BasicExecutor.java @@ -1,11 +1,17 @@ package com.lmax.disruptor.dsl; +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadInfo; +import java.lang.management.ThreadMXBean; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; import java.util.concurrent.ThreadFactory; public class BasicExecutor implements Executor { private final ThreadFactory factory; + private final Queue threads = new ConcurrentLinkedQueue(); public BasicExecutor(ThreadFactory factory) { @@ -22,5 +28,35 @@ public void execute(Runnable command) } thread.start(); + + threads.add(thread); + } + + @Override + public String toString() + { + return "BasicExecutor{" + + "threads=" + dumpThreadInfo() + + '}'; + } + + private String dumpThreadInfo() + { + final StringBuilder sb = new StringBuilder(); + + final ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean(); + + for (Thread t : threads) + { + ThreadInfo threadInfo = threadMXBean.getThreadInfo(t.getId()); + sb.append("{"); + sb.append("name=").append(t.getName()).append(","); + sb.append("id=").append(t.getId()).append(","); + sb.append("state=").append(threadInfo.getThreadState()).append(","); + sb.append("lockInfo=").append(threadInfo.getLockInfo()); + sb.append("}"); + } + + return sb.toString(); } } diff --git a/src/main/java/com/lmax/disruptor/dsl/Disruptor.java b/src/main/java/com/lmax/disruptor/dsl/Disruptor.java index 4b70e6598..eaaca7240 100644 --- a/src/main/java/com/lmax/disruptor/dsl/Disruptor.java +++ b/src/main/java/com/lmax/disruptor/dsl/Disruptor.java @@ -556,4 +556,14 @@ private void checkOnlyStartedOnce() throw new IllegalStateException("Disruptor.start() must only be called once."); } } + + @Override + public String toString() + { + return "Disruptor{" + + "ringBuffer=" + ringBuffer + + ", started=" + started + + ", executor=" + executor + + '}'; + } } From 0a6f576acef3331079178e4b69abe8ac6c754e47 Mon Sep 17 00:00:00 2001 From: Michael Barker Date: Fri, 15 Jul 2016 14:37:54 +1200 Subject: [PATCH 5/6] Add toString methods on more blocking wait strategies. --- .../com/lmax/disruptor/BlockingWaitStrategy.java | 4 ---- .../com/lmax/disruptor/LiteBlockingWaitStrategy.java | 8 ++++++++ .../disruptor/LiteTimeoutBlockingWaitStrategy.java | 8 ++++++++ .../lmax/disruptor/TimeoutBlockingWaitStrategy.java | 7 +++++++ .../disruptor/example/SequentialThreeConsumers.java | 12 ++++++------ 5 files changed, 29 insertions(+), 10 deletions(-) diff --git a/src/main/java/com/lmax/disruptor/BlockingWaitStrategy.java b/src/main/java/com/lmax/disruptor/BlockingWaitStrategy.java index 52e2b3748..3ab0a25f4 100644 --- a/src/main/java/com/lmax/disruptor/BlockingWaitStrategy.java +++ b/src/main/java/com/lmax/disruptor/BlockingWaitStrategy.java @@ -15,10 +15,6 @@ */ package com.lmax.disruptor; -import com.lmax.disruptor.util.Util; -import sun.misc.Unsafe; - -import java.lang.reflect.Field; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; diff --git a/src/main/java/com/lmax/disruptor/LiteBlockingWaitStrategy.java b/src/main/java/com/lmax/disruptor/LiteBlockingWaitStrategy.java index 8ca309075..3731e038d 100644 --- a/src/main/java/com/lmax/disruptor/LiteBlockingWaitStrategy.java +++ b/src/main/java/com/lmax/disruptor/LiteBlockingWaitStrategy.java @@ -87,4 +87,12 @@ public void signalAllWhenBlocking() } } } + + @Override + public String toString() + { + return "LiteBlockingWaitStrategy{" + + "processorNotifyCondition=" + processorNotifyCondition + + '}'; + } } diff --git a/src/main/java/com/lmax/disruptor/LiteTimeoutBlockingWaitStrategy.java b/src/main/java/com/lmax/disruptor/LiteTimeoutBlockingWaitStrategy.java index 7098fc1fb..4cd66bcf8 100644 --- a/src/main/java/com/lmax/disruptor/LiteTimeoutBlockingWaitStrategy.java +++ b/src/main/java/com/lmax/disruptor/LiteTimeoutBlockingWaitStrategy.java @@ -80,4 +80,12 @@ public void signalAllWhenBlocking() } } } + + @Override + public String toString() + { + return "LiteTimeoutBlockingWaitStrategy{" + + "processorNotifyCondition=" + processorNotifyCondition + + '}'; + } } diff --git a/src/main/java/com/lmax/disruptor/TimeoutBlockingWaitStrategy.java b/src/main/java/com/lmax/disruptor/TimeoutBlockingWaitStrategy.java index cb795063b..053b53eb7 100644 --- a/src/main/java/com/lmax/disruptor/TimeoutBlockingWaitStrategy.java +++ b/src/main/java/com/lmax/disruptor/TimeoutBlockingWaitStrategy.java @@ -70,4 +70,11 @@ public void signalAllWhenBlocking() } } + @Override + public String toString() + { + return "TimeoutBlockingWaitStrategy{" + + "processorNotifyCondition=" + processorNotifyCondition + + '}'; + } } diff --git a/src/test/java/com/lmax/disruptor/example/SequentialThreeConsumers.java b/src/test/java/com/lmax/disruptor/example/SequentialThreeConsumers.java index fde3d4c6d..0273ff20f 100644 --- a/src/test/java/com/lmax/disruptor/example/SequentialThreeConsumers.java +++ b/src/test/java/com/lmax/disruptor/example/SequentialThreeConsumers.java @@ -15,7 +15,7 @@ private static class MyEvent private Object d; } - private static EventFactory FACTORY = new EventFactory() + private static EventFactory factory = new EventFactory() { @Override public MyEvent newInstance() @@ -24,7 +24,7 @@ public MyEvent newInstance() } }; - private static EventHandler HANDLER_1 = new EventHandler() + private static EventHandler handler1 = new EventHandler() { @Override public void onEvent(MyEvent event, long sequence, boolean endOfBatch) throws Exception @@ -33,7 +33,7 @@ public void onEvent(MyEvent event, long sequence, boolean endOfBatch) throws Exc } }; - private static EventHandler HANDLER_2 = new EventHandler() + private static EventHandler handler2 = new EventHandler() { @Override public void onEvent(MyEvent event, long sequence, boolean endOfBatch) throws Exception @@ -42,7 +42,7 @@ public void onEvent(MyEvent event, long sequence, boolean endOfBatch) throws Exc } }; - private static EventHandler HANDLER_3 = new EventHandler() + private static EventHandler handler3 = new EventHandler() { @Override public void onEvent(MyEvent event, long sequence, boolean endOfBatch) throws Exception @@ -53,9 +53,9 @@ public void onEvent(MyEvent event, long sequence, boolean endOfBatch) throws Exc public static void main(String[] args) { - Disruptor disruptor = new Disruptor(FACTORY, 1024, DaemonThreadFactory.INSTANCE); + Disruptor disruptor = new Disruptor(factory, 1024, DaemonThreadFactory.INSTANCE); - disruptor.handleEventsWith(HANDLER_1).then(HANDLER_2).then(HANDLER_3); + disruptor.handleEventsWith(handler1).then(handler2).then(handler3); disruptor.start(); } From 3975d23fec7f8f951b57cae4fb7ff56f5143a44d Mon Sep 17 00:00:00 2001 From: Michael Barker Date: Fri, 15 Jul 2016 16:22:39 +1200 Subject: [PATCH 6/6] Set revision 3.3.5. --- build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index d0efede65..0a1247510 100644 --- a/build.gradle +++ b/build.gradle @@ -24,7 +24,7 @@ apply plugin: 'idea' defaultTasks 'build' group = 'com.lmax' -version = new Version(major: 3, minor: 3, revision: 5, stage: "rc2") +version = new Version(major: 3, minor: 3, revision: 5) ext { fullName = 'Disruptor Framework'