Package org.apache.storm
Class Testing
- java.lang.Object
-
- org.apache.storm.Testing
-
public class Testing extends Object
A utility that helps with testing topologies, Bolts and Spouts.
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static classTesting.CapturedTopology<T>A topology that has all messages captured and can be read later on.static interfaceTesting.ConditionSimply produces a boolean to see if a specific state is true or false.
-
Field Summary
Fields Modifier and Type Field Description static intTEST_TIMEOUT_MSThe default amount of wall time should be spent waiting for specific conditions to happen.
-
Constructor Summary
Constructors Constructor Description Testing()
-
Method Summary
All Methods Static Methods Concrete Methods Deprecated Methods Modifier and Type Method Description static voidadvanceClusterTime(ILocalCluster cluster, Integer secs)Simulated time wait for a cluster.static voidadvanceClusterTime(ILocalCluster cluster, Integer secs, Integer step)Simulated time wait for a cluster.static Testing.CapturedTopology<StormTopology>captureTopology(StormTopology topology)Rewrites a topology so that all the tuples flowing through it are captured.static Map<String,List<FixedTuple>>completeTopology(ILocalCluster cluster, StormTopology topology)Run a topology to completion capturing all of the messages that are emitted.static Map<String,List<FixedTuple>>completeTopology(ILocalCluster cluster, StormTopology topology, CompleteTopologyParam param)Run a topology to completion capturing all of the messages that are emitted.static ILocalClustergetLocalCluster(Map<String,Object> clusterConf)Deprecated.use ``` try (LocalCluster cluster = new LocalCluster.Builder()....build()) { ...static intglobalAmt(String id, String key)Deprecated.static <T> booleanisEvery(Collection<T> data, Predicate<T> pred)Convenience method for data.stream.allMatch(pred).static TrackedTopologymkTrackedTopology(ILocalCluster cluster, StormTopology topology)Deprecated.useTrackedTopologydirectly.static <T> Map<T,Integer>multiset(Collection<T> c)Count how many times each element appears in the Collection.static <T> booleanmultiseteq(Collection<T> a, Collection<T> b)Check if two collections are equivalent ignoring the order of elements.static List<List<Object>>readTuples(Map<String,List<FixedTuple>> results, String componentId)Get all of the tuples from a given component on the default stream.static List<List<Object>>readTuples(Map<String,List<FixedTuple>> results, String componentId, String streamId)Get all of the tuples from a given component on a given stream.static voidsimulateWait(ILocalCluster cluster)If using simulated time simulate waiting for 10 seconds.static TupletestTuple(List<Object> values)Create aTuplefor use with testing.static TupletestTuple(List<Object> values, MkTupleParam param)Create aTuplefor use with testing.static Testing.CapturedTopology<TrackedTopology>trackAndCaptureTopology(ILocalCluster cluster, StormTopology topology)Track and capture a topology.static voidtrackedWait(Testing.CapturedTopology<TrackedTopology> topo)Simulated time wait for a tracked topology.static voidtrackedWait(Testing.CapturedTopology<TrackedTopology> topo, Integer amt)Simulated time wait for a tracked topology.static voidtrackedWait(Testing.CapturedTopology<TrackedTopology> topo, Integer amt, Integer timeoutMs)Simulated time wait for a tracked topology.static voidtrackedWait(TrackedTopology topo)Simulated time wait for a tracked topology.static voidtrackedWait(TrackedTopology topo, Integer amt)Simulated time wait for a tracked topology.static voidtrackedWait(TrackedTopology topo, Integer amt, Integer timeoutMs)Simulated time wait for a tracked topology.static voidwhileTimeout(long timeoutMs, Testing.Condition condition, Runnable body)Continue to execute body repeatedly until condition is true or TEST_TIMEOUT_MS has passed.static voidwhileTimeout(Testing.Condition condition, Runnable body)Continue to execute body repeatedly until condition is true or TEST_TIMEOUT_MS has passed.static voidwithLocalCluster(MkClusterParam param, TestJob code)Deprecated.use ``` try (LocalCluster cluster = new LocalCluster.Builder()....build()) { ...static voidwithLocalCluster(TestJob code)Deprecated.use ``` try (LocalCluster cluster = new LocalCluster()) { ...static voidwithSimulatedTime(Runnable code)Deprecated.use ``` try (Time.SimulatedTime time = new Time.SimulatedTime()) { ...static voidwithSimulatedTimeLocalCluster(MkClusterParam param, TestJob code)Deprecated.use ``` try (LocalCluster cluster = new LocalCluster.Builder().withSimulatedTime()....build()) { ...static voidwithSimulatedTimeLocalCluster(TestJob code)Deprecated.use ``` try (LocalCluster cluster = new LocalCluster.Builder().withSimulatedTime().build()) { ...static voidwithTrackedCluster(MkClusterParam param, TestJob code)Deprecated.use ``` try (LocalCluster cluster = new LocalCluster.Builder().withTracked()....build()) { ...static voidwithTrackedCluster(TestJob code)Deprecated.use ``` try (LocalCluster cluster = new LocalCluster.Builder().withTracked().build()) { ...
-
-
-
Method Detail
-
whileTimeout
public static void whileTimeout(Testing.Condition condition, Runnable body)
Continue to execute body repeatedly until condition is true or TEST_TIMEOUT_MS has passed.- Parameters:
condition- what we are waiting forbody- what to run in the loop- Throws:
AssertionError- if the loop timed out.
-
whileTimeout
public static void whileTimeout(long timeoutMs, Testing.Condition condition, Runnable body)Continue to execute body repeatedly until condition is true or TEST_TIMEOUT_MS has passed.- Parameters:
timeoutMs- the number of ms to wait before timing out.condition- what we are waiting forbody- what to run in the loop- Throws:
AssertionError- if the loop timed out.
-
isEvery
public static <T> boolean isEvery(Collection<T> data, Predicate<T> pred)
Convenience method for data.stream.allMatch(pred).
-
withSimulatedTime
@Deprecated public static void withSimulatedTime(Runnable code)
Deprecated.use ``` try (Time.SimulatedTime time = new Time.SimulatedTime()) { ... } ```Run with simulated time.- Parameters:
code- what to run
-
withLocalCluster
@Deprecated public static void withLocalCluster(TestJob code)
Deprecated.use ``` try (LocalCluster cluster = new LocalCluster()) { ... } ```Run with a local cluster.- Parameters:
code- what to run
-
withLocalCluster
@Deprecated public static void withLocalCluster(MkClusterParam param, TestJob code)
Deprecated.use ``` try (LocalCluster cluster = new LocalCluster.Builder()....build()) { ... } ```Run with a local cluster.- Parameters:
param- configs to set in the clustercode- what to run
-
getLocalCluster
@Deprecated public static ILocalCluster getLocalCluster(Map<String,Object> clusterConf)
Deprecated.use ``` try (LocalCluster cluster = new LocalCluster.Builder()....build()) { ... } ```Run with a local cluster.- Parameters:
clusterConf- some configs to set in the cluster
-
withSimulatedTimeLocalCluster
@Deprecated public static void withSimulatedTimeLocalCluster(TestJob code)
Deprecated.use ``` try (LocalCluster cluster = new LocalCluster.Builder().withSimulatedTime().build()) { ... } ```Run with a local cluster.- Parameters:
code- what to run
-
withSimulatedTimeLocalCluster
@Deprecated public static void withSimulatedTimeLocalCluster(MkClusterParam param, TestJob code)
Deprecated.use ``` try (LocalCluster cluster = new LocalCluster.Builder().withSimulatedTime()....build()) { ... } ```Run with a local cluster.- Parameters:
param- configs to set in the clustercode- what to run
-
withTrackedCluster
@Deprecated public static void withTrackedCluster(TestJob code)
Deprecated.use ``` try (LocalCluster cluster = new LocalCluster.Builder().withTracked().build()) { ... } ```Run with a local cluster.- Parameters:
code- what to run
-
withTrackedCluster
@Deprecated public static void withTrackedCluster(MkClusterParam param, TestJob code)
Deprecated.use ``` try (LocalCluster cluster = new LocalCluster.Builder().withTracked()....build()) { ... } ```Run with a local tracked cluster.- Parameters:
param- configs to set in the clustercode- what to run
-
globalAmt
@Deprecated public static int globalAmt(String id, String key)
Deprecated.In a tracked topology some metrics are tracked. This provides a way to get those metrics. This is intended mostly for internal testing.- Parameters:
id- the id of the tracked clusterkey- the name of the metric to get.- Returns:
- the metric
-
trackAndCaptureTopology
public static Testing.CapturedTopology<TrackedTopology> trackAndCaptureTopology(ILocalCluster cluster, StormTopology topology)
Track and capture a topology. This is intended mostly for internal testing.
-
captureTopology
public static Testing.CapturedTopology<StormTopology> captureTopology(StormTopology topology)
Rewrites a topology so that all the tuples flowing through it are captured.- Parameters:
topology- the topology to rewrite- Returns:
- the modified topology and a new Bolt that can retrieve the captured tuples.
-
completeTopology
public static Map<String,List<FixedTuple>> completeTopology(ILocalCluster cluster, StormTopology topology) throws InterruptedException, org.apache.storm.thrift.TException
Run a topology to completion capturing all of the messages that are emitted. This only works when all of the spouts are instances ofCompletableSpout.- Parameters:
cluster- the cluster to submit the topology totopology- the topology itself- Returns:
- a map of the component to the list of tuples it emitted
- Throws:
org.apache.storm.thrift.TException- on any error from nimbusInterruptedException
-
completeTopology
public static Map<String,List<FixedTuple>> completeTopology(ILocalCluster cluster, StormTopology topology, CompleteTopologyParam param) throws org.apache.storm.thrift.TException, InterruptedException
Run a topology to completion capturing all of the messages that are emitted. This only works when all of the spouts are instances ofCompletableSpoutor are overwritten by MockedSources in param- Parameters:
cluster- the cluster to submit the topology totopology- the topology itselfparam- parameters to describe how to complete a topology- Returns:
- a map of the component to the list of tuples it emitted
- Throws:
org.apache.storm.thrift.TException- on any error from nimbus.InterruptedException
-
simulateWait
public static void simulateWait(ILocalCluster cluster) throws InterruptedException
If using simulated time simulate waiting for 10 seconds. This is intended for internal testing only.- Throws:
InterruptedException
-
readTuples
public static List<List<Object>> readTuples(Map<String,List<FixedTuple>> results, String componentId)
Get all of the tuples from a given component on the default stream.- Parameters:
results- the results of running a completed topologycomponentId- the id of the component to look at- Returns:
- a list of the tuple values.
-
readTuples
public static List<List<Object>> readTuples(Map<String,List<FixedTuple>> results, String componentId, String streamId)
Get all of the tuples from a given component on a given stream.- Parameters:
results- the results of running a completed topologycomponentId- the id of the component to look atstreamId- the id of the stream to look for.- Returns:
- a list of the tuple values.
-
mkTrackedTopology
@Deprecated public static TrackedTopology mkTrackedTopology(ILocalCluster cluster, StormTopology topology)
Deprecated.useTrackedTopologydirectly.Create a tracked topology.
-
trackedWait
public static void trackedWait(Testing.CapturedTopology<TrackedTopology> topo)
Simulated time wait for a tracked topology. This is intended for internal testing.
-
trackedWait
public static void trackedWait(Testing.CapturedTopology<TrackedTopology> topo, Integer amt)
Simulated time wait for a tracked topology. This is intended for internal testing.
-
trackedWait
public static void trackedWait(Testing.CapturedTopology<TrackedTopology> topo, Integer amt, Integer timeoutMs)
Simulated time wait for a tracked topology. This is intended for internal testing.
-
trackedWait
public static void trackedWait(TrackedTopology topo)
Simulated time wait for a tracked topology. This is intended for internal testing.
-
trackedWait
public static void trackedWait(TrackedTopology topo, Integer amt)
Simulated time wait for a tracked topology. This is intended for internal testing.
-
trackedWait
public static void trackedWait(TrackedTopology topo, Integer amt, Integer timeoutMs)
Simulated time wait for a tracked topology. This is intended for internal testing.
-
advanceClusterTime
public static void advanceClusterTime(ILocalCluster cluster, Integer secs) throws InterruptedException
Simulated time wait for a cluster. This is intended for internal testing.- Throws:
InterruptedException
-
advanceClusterTime
public static void advanceClusterTime(ILocalCluster cluster, Integer secs, Integer step) throws InterruptedException
Simulated time wait for a cluster. This is intended for internal testing.- Throws:
InterruptedException
-
multiset
public static <T> Map<T,Integer> multiset(Collection<T> c)
Count how many times each element appears in the Collection.- Parameters:
c- a collection of values- Returns:
- a map of the unique values in c to the count of those values.
-
multiseteq
public static <T> boolean multiseteq(Collection<T> a, Collection<T> b)
Check if two collections are equivalent ignoring the order of elements.
-
testTuple
public static Tuple testTuple(List<Object> values)
Create aTuplefor use with testing.- Parameters:
values- the values to appear in the tuple
-
testTuple
public static Tuple testTuple(List<Object> values, MkTupleParam param)
Create aTuplefor use with testing.- Parameters:
values- the values to appear in the tupleparam- parametrs describing more details about the tuple
-
-