Wikipedia edits

All edits actions happening on Wikipedia are recorded and available as a stream of data (see https://wikitech.wikimedia.org/wiki/Event_Platform/EventStreams for further details). A possible way of process mine the stream of edits happening is by considering the page being edited as the instance of the editing process and the edit "action" as the actual activity name.

To achieve this goal we can write a new BeamlineAbstractSource that consumes the stream of edits and produces a stream of BEvents that can then be forwarded to one of the miners. So we can first define our source as well as the set of websites we want to filter (in this case we will focus on edits happening on the English version of Wikipedia, i.e., enwiki):

List<String> processesToStream = Arrays.asList("enwiki");

After then we can write the code to transform the JSON produced by the Wikipedia stream into a stream of XTraces.

Client client = ClientBuilder.newClient();
WebTarget target = client.target("https://stream.wikimedia.org/v2/stream/recentchange");
SseEventSource source = SseEventSource.target(target).reconnectingEvery(5, TimeUnit.SECONDS).build();
source.register(new Consumer<InboundSseEvent>() {
   @Override
   public void accept(InboundSseEvent t) {
      String data = t.readData();
      if (data != null) {
         JSONObject obj = new JSONObject(data);

         String processName = obj.getString("wiki");
         String caseId = obj.getString("title");
         String activityName = obj.getString("type");

         if (processesToStream.contains(processName)) {
            // prepare the actual event
            try {
               buffer.add(BEvent.create(processName, caseId, activityName));
            } catch (EventException e) {
               e.printStackTrace();
            }
         }
      }
   }
});
source.open();

This code can be wrapped in a thread that executes all the time, and stores each event in a buffer for further dispatching:

public class WikipediaEditSource extends BeamlineAbstractSource {

   private static final long serialVersionUID = 608025607423103621L;
   private static List<String> processesToStream = Arrays.asList("enwiki");

   public void run(SourceContext<BEvent> ctx) throws Exception {
      Queue<BEvent> buffer = new LinkedList<>();

      new Thread(new Runnable() {
         @Override
         public void run() {
            // code from previous listing
            // ...
         }
      }).start();

      while(isRunning()) {
         while (isRunning() && buffer.isEmpty()) {
            Thread.sleep(100l);
         }
         if (isRunning()) {
            synchronized (ctx.getCheckpointLock()) {
               BEvent e = buffer.poll();
               ctx.collect(e);
            }
         }
      }
   }
}

A simple consumer, in this case the Trivial discovery miner, can then be attached to the source with:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env
   .addSource(new WikipediaEditSource())
   .keyBy(BEvent::getProcessName)
   .flatMap(new DirectlyFollowsDependencyDiscoveryMiner().setModelRefreshRate(10).setMinDependency(0))
   .addSink(new SinkFunction<ProcessMap>(){
      public void invoke(ProcessMap value, Context context) throws Exception {
         value.generateDot().exportToSvg(new File("src/main/resources/output/output.svg"));
      };
   });
env.execute();

After running the system for about a couple of minutes, the following map was produced:

G e9f086037-f263-4023-a09a-65194c88011b->e0615c600-1e15-47f0-94f6-9ce57be6cab9 0.018 (3) e9f086037-f263-4023-a09a-65194c88011b->eb542aaf4-fda2-48c0-96c2-b404ab6e59ce 0.0058 (1) ecdf5f385-1425-4bad-8feb-175f435d6826->e9f086037-f263-4023-a09a-65194c88011b 0.0058 (1) ecdf5f385-1425-4bad-8feb-175f435d6826->ecdf5f385-1425-4bad-8feb-175f435d6826 0.21 (36) ecdf5f385-1425-4bad-8feb-175f435d6826->e0615c600-1e15-47f0-94f6-9ce57be6cab9 0.099 (17) e0615c600-1e15-47f0-94f6-9ce57be6cab9->ecdf5f385-1425-4bad-8feb-175f435d6826 0.053 (9) e0615c600-1e15-47f0-94f6-9ce57be6cab9->e0615c600-1e15-47f0-94f6-9ce57be6cab9 0.87 (149) eb542aaf4-fda2-48c0-96c2-b404ab6e59ce->e9f086037-f263-4023-a09a-65194c88011b 0.0058 (1) eb542aaf4-fda2-48c0-96c2-b404ab6e59ce->e0615c600-1e15-47f0-94f6-9ce57be6cab9 0.0058 (1) eb542aaf4-fda2-48c0-96c2-b404ab6e59ce->eb542aaf4-fda2-48c0-96c2-b404ab6e59ce 1.0 (171) e5c219b19-76ba-4a89-81e8-472c2f157271->e9f086037-f263-4023-a09a-65194c88011b e5c219b19-76ba-4a89-81e8-472c2f157271->ecdf5f385-1425-4bad-8feb-175f435d6826 e5c219b19-76ba-4a89-81e8-472c2f157271->e0615c600-1e15-47f0-94f6-9ce57be6cab9 e5c219b19-76ba-4a89-81e8-472c2f157271->eb542aaf4-fda2-48c0-96c2-b404ab6e59ce e9f086037-f263-4023-a09a-65194c88011b new 0.035 (34) ecdf5f385-1425-4bad-8feb-175f435d6826 log 0.13 (121) e0615c600-1e15-47f0-94f6-9ce57be6cab9 edit 1.0 (963) eb542aaf4-fda2-48c0-96c2-b404ab6e59ce categorize 0.43 (412) e5c219b19-76ba-4a89-81e8-472c2f157271

The complete code of this example is available in the GitHub repository https://github.com/beamline/examples/tree/master/src/main/java/beamline/examples/wikipedia.