Skip to content

OpenSky monitoring

OpenSky Network is a non profit project which

[...] consists of a multitude of sensors connected to the Internet by volunteers, industrial supporters, and academic/governmental organizations. All collected raw data is archived in a large historical database. The database is primarily used by researchers from different areas to analyze and improve air traffic control technologies and processes.

-- Description from https://opensky-network.org/about/about-us

Essentially, each airplane uses a transponder to transmit data regarding their status (squawk) as well as their callsign and current position. All this information is collected by OpenSky Network and made available though their APIs.

The underlying idea of this example is that each flight (i.e., an airplane callsign) represents the instance of a flight process. The different squawks a plane goes through indicate the activities involved in the process.

To achieve this goal, it is necessary to write a new source which periodically queries the OpenSky Network APIs to retrieve the live status of airplanes in a certain area. First it's necessary to create the actual source and initialize the OpenSkyApi wrapper:

1
2
3
4
5
6
7
8
9
public class OpenSkySource extends BeamlineAbstractSource {
    private OpenSkyApi api;

    @Override
    public void open(Configuration parameters) throws Exception {
        Properties prop = new Properties();
        prop.load(new FileInputStream("./openskyCredentials.properties"));
        api = new OpenSkyApi(prop.getProperty("USERNAME"), prop.getProperty("PASSWORD"));
    }
Please note that in this case we use the Java API (imported from JitPack) and we assume the presence of a file openskyCredentials.properties containing username and password for accessing the APIs.

Once the system is properly connected to the APIs, then in the run method it is possible to define a separate thread in charge of querying the APIs every 15 seconds and put the events into a buffer which is then used for dispatching them. In the case highlighted the APIs are queried to retrieve flights over the central Europe (lines 19-20). In addition the squawks are parsed to provide some more understandable interpretation (according to the interpretation reported here https://www.flightradars.eu/squawkcodes.html, the actual code of method squawkToString is omitted in this page but is available on the GitHub repository).

    @Override
    public void run(SourceContext<BEvent> ctx) throws Exception {
        Queue<BEvent> buffer = new LinkedList<>();

        new Thread(() -> {
            while(isRunning()) {
                try {
                    OpenSkyStates os = api.getStates(0, null,
                        new OpenSkyApi.BoundingBox(
                            35.0518857, 62.4097744,
                            -5.8468354, 34.3186395));
                    if (os != null) {
                        for (StateVector sv : os.getStates()) {
                            try {
                                if (!sv.getCallsign().isBlank()) {
                                    buffer.add(
                                        BEvent.create(
                                            "squawk",
                                            sv.getCallsign().trim(),
                                            squawkToString(sv.getSquawk())));
                                }
                            } catch (EventException e) {
                                e.printStackTrace();
                            }
                        }
                    } else {
                        System.out.println("No new information...");
                    }
                    Thread.sleep(15000l);
                } catch (Exception e) {
                    // nothing to see here
                    e.printStackTrace();
                }
            }
        }).start();

        while(isRunning()) {
            while (isRunning() && buffer.isEmpty()) {
                Thread.sleep(100l);
            }
            if (isRunning()) {
                synchronized (ctx.getCheckpointLock()) {
                    BEvent e = buffer.poll();
                    ctx.collect(e);
                }
            }
        }
    }
}

A simple consumer, in this case the Trivial discovery miner, can then be attached to the source with:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env
   .addSource(new OpenSkySource())
   .keyBy(BEvent::getProcessName)
   .flatMap(new DirectlyFollowsDependencyDiscoveryMiner().setModelRefreshRate(10).setMinDependency(0))
   .addSink(new SinkFunction<ProcessMap>(){
      public void invoke(ProcessMap value, Context context) throws Exception {
         value.generateDot().exportToSvg(new File("src/main/resources/output/output.svg"));
      };
   });
env.execute();

After running the system for about a few minutes, the following map was produced, where essentially only transit squawks were observed:

G eec7010c0-450a-418f-b0be-fa2d42391974->eec7010c0-450a-418f-b0be-fa2d42391974 1.0 (334) eec7010c0-450a-418f-b0be-fa2d42391974 Transit 1.0 (359)

The complete code of this example is available in the GitHub repository https://github.com/beamline/examples/tree/master/src/main/java/beamline/examples/opensky.

Scientific literature