Categories
Development

How to request Redis server using a streaming input PCollection?

Using RedisIO, I’m trying to query a collection to a redis server.

The Redis server is ok and responding good only when batch pipeline (no streaming).

But, using streaming input data (from files) like this:

  PCollection<String> stream = pipeline.apply("ReadMyFile", TextIO.read().from("/home/out/**")
  .watchForNewFiles(Duration.standardSeconds(60), Watch.Growth.<String>never()))   
  .apply("ParseFn", ParDo.of(new ParseFn()))
  .apply("GlobalString", GlobalString.get(Duration.ZERO, Duration.standardSeconds(60)));

And then, apply the redisIO read() function:

      PCollection<KV<String, String>> redis = stream.apply(RedisIO.readAll().withEndpoint("127.0.0.1", 6379));

Finally, want to use the result collection, so:

 PCollection<String> result = redis.apply("Compose Final Object", ParDo.of(new DoFn<KV<String, String>, String>() {
    @ProcessElement
    public void processElement(ProcessContext c) {
      System.out.println(c.element().getKey());
      c.output(c.element().getKey());
    }
  }));

As far as I tested, the files are being loaded and processed as needed.

Leave a Reply

Your email address will not be published. Required fields are marked *