記事一覧はこちら

JavaからTwitterのデータをgoogle BigQueryにぶち込む方法

つまり、javaからtwitter streaming apiを使う方法と、javaからgoogle big queryのinsertを発行する方法。環境はeclipse 最終的なソースはこちら gist.github/BigQuery.java 以下蛇足

Twitter

大正義Twitter4J - A Java library for the Twitter APIを使う。いつのまにかStreamingAPI上で生のjsonも取れるようになってさらい最強に。 特に書くことは無いからコード読んで。jar突っ込めばおk。Twitterのアプリの作り方はググって。

BigQuery

参考サイト。もといコピペ元サイト JavaでGCS(Google Cloud Storage)とBigQuery周りの開発をする際のハマりどころ | Oreradio.memo BigQuery API Quickstart - BigQuery — Google Cloud Platform BigQuery API Quickstart Complete Source Code (Java) error - Stack Overflow How to create a BigQuery dataset and a table/schema from Java client (without CSV file) - Stack Overflow gcp - GAEからBigQueryへのデータ投入が上手くいかない - Qiita Query Reference - BigQuery — Google Cloud Platform 便利な関数一覧 GoogleCloudPlatform - 便利な関数の甘い罠 - Qiita TIMESTAMP型は全てUTCです! Preparing Data for BigQuery - BigQuery — Google Cloud Platform型の一覧。文字はUTF8で2MBまで、INTEGERは64bit符号付き

では開始

必要なライブラリをDL

Google APIs Client Libraries  |  API Discovery Service  |  Google DevelopersからJavaSamplesを選んで imgTemp-2015-09-19-12-39-13 Supported Google APIs  |  API Client Library for Java  |  Google Developersからbig queryのv2を選んで imgTemp-2015-09-19-12-42-01 BigQuery API Client Library for Java  |  API Client Library for Java  |  Google Developersからファイルを落とす imgTemp-2015-09-19-12-43-23

zipの中身はこんな感じ。上のディレクトリに選んだライブラリ(BigQuery)のjarがあって、libsの下にその他全般的なjarがあるって感じかな。 試しにBigQuery以外のjar カレンダーだけど を選んで落としてみたらそんな感じだった。 imgTemp-2015-09-19-12-45-53imgTemp-2015-09-19-12-47-20

サンプルにしたサイトが使っていたからという理由でguava-libraries - Guava: Google Core Libraries for Java 1.6+ - Google Project Hostingを落とす。jar一つ

pk12証明書を落とす

この画面に行ってサービスアカウントを選ぶ。 imgTemp-2015-09-19-12-57-26imgTemp-2015-09-19-12-59-02 んで鍵ファイルを作ると imgTemp-2015-09-19-13-02-00imgTemp-2015-09-19-13-02-34imgTemp-2015-09-19-13-03-57imgTemp-2015-09-19-13-07-42imgTemp-2015-09-19-13-10-26 プロジェクトIDもメモっておく。黒塗りばっかで分かりにくいけど、赤の部分。プロジェクト番号は数字オンリーで、プロジェクトIDは英数字のやつな。 あと、@developer.なんとかってメールアドレスもコピー。これはapiの接続に使う。自分のgoogleアカウントのメアド入れてなんで繋がらないのか5分悩んだ。 imgTemp-2015-09-19-13-38-02

コード

上記サイトをコピペしてくっつけました 指定したapiTwitterに接続する。指定したgoogleApiの秘密鍵で指定したプロジェクトの、指定したデータセットの中に指定したテーブルを2つ作る。 片方はstreamingAPIの生のjsonが入って、片方にはちょっと読みやすいテキストが入る。 プロジェクトの一覧を取得する、データセットの一覧を取得する、テーブルの一覧を取得する、テーブルをプログラムから作る。任意のクエリを投げる、streamingInsertをプログラムから試す。 が、あるのでこれで大体は使えると思います。 日付型はcom.google.api.client.util.DateTime型にしないとエラーが出るから注意! StreamInsertは1リクエストに200msくらいかかった(さくらvps リージョン忘れた環境)のでフォロワーの多い人とか、sampleストリームは余裕で間に合わない。その為に複数個のリクエストをまとめて送りましょうってこったな。

#Fushihara
twitter.consumerKey=xxxxxx
twitter.consumerSecret=xxxxxx
twitter.accessToken=xxxxxx
twitter.accessTokenSecret=xxxxxx

#big query
bq.mail=xxxxxx@developer.gserviceaccount.com
bq.app=test app 20150919
bq.p12Path=C:/xxxxxx/web-storage-xxxxxx-key.p12
bq.projectid=xxxxxx
bq.dataset=twitter
bq.table=home_timeline
bq.tableRawJson=home_timeline_raw

import java.io.IOException;
import java.security.GeneralSecurityException;
import java.util.ResourceBundle;
// 2015-09-19 18:06
public class SendBigQuery {
    private TwitterStreaming twitter;
    private BigQuery bigQuery;
    public static void main(String[] args) {
        SendBigQuery main=new SendBigQuery();
        try {
            main.init();
        } catch (GeneralSecurityException | IOException e) {
            e.printStackTrace();
            return;
        }
        main.run();
    }
    public void init() throws GeneralSecurityException, IOException{
        this.twitter=new TwitterStreaming();
        this.twitter.init();
        this.bigQuery=new BigQuery();
        this.bigQuery.initialize();
    }
    public void run(){
        this.twitter.run();
    }
}
import java.io.IOException;
import java.util.ResourceBundle;

import twitter4j.RawStreamListener;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.conf.Configuration;
import twitter4j.conf.ConfigurationBuilder;
// 2015-09-19 18:06
public class TwitterStreaming {
    private Configuration config;

    public void init() {
        ResourceBundle bundle = ResourceBundle.getBundle("setting");
        ConfigurationBuilder cb = new ConfigurationBuilder();
        cb.setDebugEnabled(true);
        cb.setOAuthConsumerKey(bundle.getString("twitter.consumerKey"));
        cb.setOAuthConsumerSecret(bundle.getString("twitter.consumerSecret"));
        cb.setOAuthAccessToken(bundle.getString("twitter.accessToken"));
        cb.setOAuthAccessTokenSecret(bundle.getString("twitter.accessTokenSecret"));
        cb.setDebugEnabled(true);
        this.config = cb.build();
    }

    public void run() {
        StatusListener listener = new StatusListener() {
            public void onStatus(Status status) {
                System.out.println("onStatus:" + status.getText().replaceAll("\n", ""));
                try {
                    BigQuery.getInstance().sendTweetData(status);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

            public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {
                System.out.println("onDelete:" + statusDeletionNotice.getStatusId());
            }

            public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
                System.out.println("onTrackLimit:" + numberOfLimitedStatuses);
            }

            public void onException(Exception ex) {
                System.err.println("onException");
                ex.printStackTrace();
            }

            @Override
            public void onScrubGeo(long arg0, long arg1) {
                System.out.println("onScrubGeo:" + arg0 + " / " + arg1);
            }

            @Override
            public void onStallWarning(StallWarning arg0) {
                System.out.println("onStallWarning:" + arg0.getMessage());
            }
        };
        RawStreamListener rsl = new RawStreamListener() {

            @Override
            public void onException(Exception arg0) {
                System.err.println("onException:raw:");
                arg0.printStackTrace();
            }

            @Override
            public void onMessage(String arg0) {
                if (arg0 == null) {
                    System.out.println("onMessage:raw:null");
                } else if (arg0.length() ==0) {
                    System.out.println("onMessage:raw:empty" );
                } else if (arg0.length() < 50) {
                    System.out.println("onMessage:raw:" + arg0);
                } else {
                    System.out.println("onMessage:raw:" + arg0.substring(0, 49));
                }
                if(arg0==null || arg0.length()==0){
                    return;
                }
                try {
                    BigQuery.getInstance().sendRawJson(arg0);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        };
        TwitterStream twitterStream = new TwitterStreamFactory(config).getInstance();
        twitterStream.addListener(listener);
        twitterStream.addListener(rsl);
        twitterStream.user();
    }
}

import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.ResourceBundle;

import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
import com.google.api.client.http.HttpTransport;
import com.google.api.client.json.JsonFactory;
import com.google.api.client.json.jackson2.JacksonFactory;
import com.google.api.client.util.Data;
import com.google.api.client.util.DateTime;
import com.google.api.services.bigquery.Bigquery;
import com.google.api.services.bigquery.BigqueryScopes;
import com.google.api.services.bigquery.model.DatasetList;
import com.google.api.services.bigquery.model.DatasetList.Datasets;
import com.google.api.services.bigquery.model.ErrorProto;
import com.google.api.services.bigquery.model.GetQueryResultsResponse;
import com.google.api.services.bigquery.model.ProjectList;
import com.google.api.services.bigquery.model.ProjectList.Projects;
import com.google.api.services.bigquery.model.QueryRequest;
import com.google.api.services.bigquery.model.QueryResponse;
import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableCell;
import com.google.api.services.bigquery.model.TableDataInsertAllRequest;
import com.google.api.services.bigquery.model.TableDataInsertAllResponse;
import com.google.api.services.bigquery.model.TableDataInsertAllResponse.InsertErrors;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableList;
import com.google.api.services.bigquery.model.TableList.Tables;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.common.collect.ImmutableList;
// 2015-09-19 18:06
public class BigQuery {
    private final HttpTransport httpTransport;
    private final JsonFactory jsonFactory = new JacksonFactory();
    private GoogleCredential credentials;
    private Bigquery client;
    private String mailAddress;
    private String applicationName;
    private String p12EncryptFilePath;
    private String projectId;
    private String dataset;
    private String table;
    private String tableRawJson;
    private static BigQuery instance;

    public static BigQuery getInstance() {
        return BigQuery.instance;
    }

    public BigQuery() throws GeneralSecurityException, IOException {
        this.httpTransport = GoogleNetHttpTransport.newTrustedTransport();
        BigQuery.instance = this;
    }

    public void initialize() throws GeneralSecurityException, IOException {
        ResourceBundle bundle = ResourceBundle.getBundle("setting");
        this.mailAddress = bundle.getString("bq.mail");
        this.applicationName = bundle.getString("bq.app");
        this.p12EncryptFilePath = bundle.getString("bq.p12Path");
        this.projectId = bundle.getString("bq.projectid");
        this.dataset = bundle.getString("bq.dataset");
        this.table = bundle.getString("bq.table");
        this.tableRawJson = bundle.getString("bq.tableRawJson");
        this.credentials = new GoogleCredential.Builder().setTransport(httpTransport).setJsonFactory(jsonFactory)
                .setServiceAccountId(this.mailAddress)
                .setServiceAccountScopes(ImmutableList.of(BigqueryScopes.BIGQUERY, BigqueryScopes.BIGQUERY_INSERTDATA))
                .setServiceAccountPrivateKeyFromP12File(new File(this.p12EncryptFilePath)).build();

        this.client = new Bigquery.Builder(httpTransport, jsonFactory, credentials).setApplicationName(applicationName)
                .build();
        long maxResults = 1;
        ProjectList projectList = client.projects().list().setMaxResults(maxResults).execute();
        System.out.println(projectList.toPrettyString());
        String query = "SELECT * FROM [twitter.tweet] where [twitter.tweet.message] like '%TGS%'";
        // runQueryRpcAndPrint(this.client, "this.projectId", query,System.out);
        // listProjects(this.client);
        // listDataset(this.client);
        // listTables(this.client, "twitter");
        // insertTable(this.client, this.projectId, this.dataset, this.table,
        // getTwitterLogSchema());
        // insertTable(this.client, this.projectId, this.dataset,
        // this.tableRawJson, getTwitterLogRawJsonSchema());
    }

    public void sendRawJson(String json) throws IOException {
        TableRow row = new TableRow();
        row.set("date", new DateTime(new Date()));// dateTimeにしないとダメ。"JSON map
                                                    // specified for non-record
                                                    // field"と言われる
        row.set("rawJson", json);
        TableDataInsertAllRequest.Rows rows = new TableDataInsertAllRequest.Rows();
        rows.setInsertId(Long.toString(System.nanoTime()));
        rows.setJson(row);
        List<TableDataInsertAllRequest.Rows> rowList = new ArrayList<TableDataInsertAllRequest.Rows>();
        rowList.add(rows);

        TableDataInsertAllRequest content = new TableDataInsertAllRequest().setRows(rowList);
        TableDataInsertAllResponse result = this.client.tabledata()
                .insertAll(this.projectId, this.dataset, this.tableRawJson, content).execute();
        tableDataInsertResult(result);
    }

    public void sendTweetData(twitter4j.Status status) throws IOException {
        TableRow row = new TableRow();
        row.set("tweet_id", status.getId());
        row.set("tweet_message", status.getText());
        row.set("client", status.getSource());
        row.set("created_at", new DateTime(status.getCreatedAt()));
        row.set("user_id", status.getUser().getId());
        row.set("user_name", status.getUser().getName());
        row.set("user_screen_name", status.getUser().getScreenName());
        TableDataInsertAllRequest.Rows rows = new TableDataInsertAllRequest.Rows();
        rows.setInsertId(Long.toString(System.nanoTime()));
        rows.setJson(row);
        List<TableDataInsertAllRequest.Rows> rowList = new ArrayList<TableDataInsertAllRequest.Rows>();
        rowList.add(rows);

        TableDataInsertAllRequest content = new TableDataInsertAllRequest().setRows(rowList);
        TableDataInsertAllResponse result = this.client.tabledata()
                .insertAll(this.projectId, this.dataset, this.table, content).execute();
        tableDataInsertResult(result);
    }

    private static void tableDataInsertResult(TableDataInsertAllResponse result) {
        if(result==null){
            return;
        }
        System.out.println(result.getKind());
        if(result.getInsertErrors()==null){
            return;
        }
        for (InsertErrors error : result.getInsertErrors()) {
            for (ErrorProto rows2 : error.getErrors()) {
                System.err.println(rows2.getDebugInfo());
            }
        }
    }

    private void listProjects(Bigquery bigquery) throws IOException {
        ProjectList result = bigquery.projects().list().execute();
        for (Projects project : result.getProjects()) {
            System.err.println(project.toPrettyString());
        }
    }

    private void listDataset(Bigquery bigquery) throws IOException {
        DatasetList result = bigquery.datasets().list(this.projectId).execute();
        for (Datasets dataset : result.getDatasets()) {
            System.err.println(dataset.toPrettyString());
        }
    }

    private void listTables(Bigquery bigQuery, String datasetName) throws IOException {
        TableList result = bigQuery.tables().list(this.projectId, datasetName).execute();
        for (Tables table : result.getTables()) {
            System.err.println(table.toPrettyString());
        }
    }

    private static void insertTable(Bigquery bigQuery, String projectName, String datasetName, String tableId,
            TableSchema schema) throws IOException {
        Table table = new Table();
        table.setSchema(schema);
        TableReference tableRef = new TableReference();
        tableRef.setDatasetId(datasetName);
        tableRef.setProjectId(projectName);
        tableRef.setTableId(tableId);
        table.setTableReference(tableRef);
        bigQuery.tables().insert(projectName, datasetName, table).execute();
    }

    private static TableSchema getTwitterLogSchema() {
        List<TableFieldSchema> root = new ArrayList<TableFieldSchema>();
        root.add(new TableFieldSchema().setName("tweet_id").setType("INTEGER").setMode("REQUIRED").setDescription(""));
        root.add(new TableFieldSchema().setName("tweet_message").setType("STRING").setMode("REQUIRED")
                .setDescription(""));
        root.add(new TableFieldSchema().setName("client").setType("STRING").setMode("REQUIRED").setDescription(""));
        root.add(new TableFieldSchema().setName("created_at").setType("TIMESTAMP").setMode("REQUIRED")
                .setDescription(""));
        root.add(new TableFieldSchema().setName("user_id").setType("INTEGER").setMode("REQUIRED").setDescription(""));
        root.add(new TableFieldSchema().setName("user_name").setType("STRING").setMode("REQUIRED").setDescription(""));
        root.add(new TableFieldSchema().setName("user_screen_name").setType("STRING").setMode("REQUIRED")
                .setDescription(""));
        TableSchema schema = new TableSchema();
        schema.setFields(root);
        return schema;
    }

    private static TableSchema getTwitterLogRawJsonSchema() {
        List<TableFieldSchema> root = new ArrayList<TableFieldSchema>();
        root.add(new TableFieldSchema().setName("date").setType("TIMESTAMP").setMode("REQUIRED").setDescription(""));
        root.add(new TableFieldSchema().setName("rawJson").setType("STRING").setMode("REQUIRED").setDescription(""));
        TableSchema schema = new TableSchema();
        schema.setFields(root);
        return schema;
    }

    static void runQueryRpcAndPrint(Bigquery bigquery, String projectId, String query, PrintStream out)
            throws IOException {
        QueryRequest queryRequest = new QueryRequest().setQuery(query);
        QueryResponse queryResponse = bigquery.jobs().query(projectId, queryRequest).execute();
        if (queryResponse.getJobComplete()) {
            printRows(queryResponse.getRows(), out);
            if (null == queryResponse.getPageToken()) {
                return;
            }
        }
        String pageToken = null;
        while (true) {
            GetQueryResultsResponse queryResults = bigquery.jobs()
                    .getQueryResults(projectId, queryResponse.getJobReference().getJobId()).setPageToken(pageToken)
                    .execute();
            if (queryResults.getJobComplete()) {
                printRows(queryResults.getRows(), out);
                pageToken = queryResults.getPageToken();
                if (null == pageToken) {
                    return;
                }
            }
        }
    }

    private static void printRows(List<TableRow> rows, PrintStream out) {
        if (rows != null) {
            for (TableRow row : rows) {
                for (TableCell cell : row.getF()) {
                    out.printf("%s, ", Data.isNull(cell.getV()) ? "null" : cell.getV().toString());
                }
                out.println();
            }
        }
    }
}