Hello,
I wanted to do a simple query with siddhi which gives me back the hits in a batch. I could easily do it with esper but it seems I had some problems with siddhi. Is there a way to use external time instaed of internal time ? I tried to use External Time Window feature but it only can be used for simple queries, so it is not enough for me.
Here is the poc (ESPER):
Configuration con = new Configuration();
//con.addEventType("LoginType", beanClass);
con.addEventType("LoginType", LoginType.class);
EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider(con);
String expression = "select count(loginType), timeStamp, userName,loginType,systemType,eventID, count(eventID) as ecntid from LoginType.win:time_batch(5 sec) where eventID='254' and userName='maci' ";
//expression = "select a.timeStamp, a.userName,a.loginType,a.systemType,count(a.eventID ) as ecntid from pattern[every a=LoginType(eventID='254' and count(a.eventID) > 3) where timer:within(5 sec)";
//expression = "select o.timeStamp, o.userName,o.systemType,o.eventID, count(o.eventID), b.timeStamp, b.eventID from pattern[every o=LoginType -> (timer:interval(5 sec) and b=LoginType(eventID=o.eventID) )]";
expression = "select * from LoginType.win:time_batch(5 sec) " +
" match_recognize ( " +
" measures A.eventID as aEID, B.eventID as bEID , A.timeStamp as firstStamp, B.timeStamp as secondStamp " +
" pattern ( A B ) " +
" define " +
" A as A.eventID = '254' and A.userName='maci' , " +
" B as B.eventID = '255' and prev(B.userName) = A.userName " +
" ) ";
//expression = "select a.eventID,b.eventID,a.userName,b.userName,a.timeStamp,b.timeStamp from pattern[every a=LoginType(a.eventID='254') -> (timer:interval(5 sec) and b=LoginType(b.eventID='255'))]";
/*
expression = "select sorted(price desc).take(5) as highestprice " +
" from LoginType.win:time(5 min) ";
*/
EPStatement statement = epService.getEPAdministrator().createEPL(expression);
MyListener listener = new MyListener();
statement.addListener(listener);
Calendar c = Calendar.getInstance();
EPRuntime runtime = epService.getEPRuntime();
runtime.sendEvent(new TimerControlEvent(TimerControlEvent.ClockType.CLOCK_EXTERNAL));
long start = new Date().getTime();
int k = 0;
for (k = 0; k <= 100000000; k++) {
c.add(Calendar.SECOND, 1);
String eventID = "254";
if (k % 3 == 0) {
eventID = "255";
}
runtime.sendEvent(new CurrentTimeEvent(c.getTime().getTime()));
runtime.sendEvent(new LoginType(new Date(c.getTime().getTime()), "type1", "any name", eventID, "windows", new Random().nextInt(1000)));
}
long end = new Date().getTime();
System.out.println(end - start);
I tried to use a similar query in esper but I was not be able to use the timestamp of my logs.
Could you give me a similar example with siddhi ?
Here are my attempts to create the above example with siddhi :
public void externalTimeWindowTest1() throws InterruptedException {
SiddhiManager siddhiManager = new SiddhiManager();
String cseEventStream = "define stream LoginEvents (myTime long, ip string, phone string,price int) ;";
String query = "@info(name = 'query1') from LoginEvents#window.timeBatch(5 sec) "
+ "select myTime, phone, ip, price , max(price) as maxprice, min(price) as minprice, count(myTime) as cntip insert all events into OutPut ";
/*String query = "@info(name='query1') from every a1 = LoginEvents " +
" -> b1 = LoginEvents[b1.ip == a1.ip ]#window.externalTime(b1.myTime,5 second) " +
" within 5 seconds select a1.myTime, a1.phone, a1.ip, a1.price , max(a1.price) as maxprice, min(a1.price) as minprice, count(a1.myTime) as cntip insert current events into Output ";
*/
final ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + query);
executionPlanRuntime.addCallback("query1", new QueryCallback() {
@Override
public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
EventPrinter.print(inEvents);
System.out.println(new Date(timeStamp));
if (inEvents != null) {
System.out.println("======================== START ===============================");
for (Event e : inEvents) {
if (e.isExpired()) continue;
System.out.println("----------------------------");
System.out.println(new Date(e.getTimestamp()));
System.out.println("IP:" + e.getData(2));
System.out.println("Max price:" + e.getData(4));
System.out.println("Min price:" + e.getData(5));
System.out.println("IP siddhiCount:" + e.getData(6));
System.out.println("Expired :" + e.isExpired());
System.out.println("----------------------------");
}
System.out.println("======================== END ===============================");
}
}
});
executionPlanRuntime.start();
new Thread(new Runnable() {
@Override
public void run() {
Calendar c = Calendar.getInstance();
c.add(Calendar.HOUR, 1);
c.add(Calendar.SECOND, 1);
InputHandler inputHandler = executionPlanRuntime.getInputHandler("LoginEvents");
int i = 0;
for (i = 0; i <= 1000; i++) {
c.add(Calendar.SECOND, 1);
try {
inputHandler.send(c.getTime().getTime(), new Object[]{c.getTime().getTime(), new String("192.10.1.1"), "1", new Random().nextInt(1000)});
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
sleep(15000);
executionPlanRuntime.shutdown();
System.out.println("Done");
}
Version :
✘~/Downloads/tmp/siddhi/siddhi ➦ e18aaef git branch | sed -n '/* /s///p'
(HEAD detached at v3.0.1)
~/Downloads/tmp/siddhi/siddhi ➦ e18aaef git status
HEAD detached at v3.0.1
nothing to commit, working directory clean