public class LogAnalysis {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment(); senv.enableCheckpointing(5000L); senv.setStateBackend(new FsStateBackend("hdfs://kms-1:8020/flink-checkpoints")); senv.setRestartStrategy( RestartStrategies.fixedDelayRestart(3, Time.of(2, TimeUnit.SECONDS) ));
EnvironmentSettings settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(senv, settings); Properties props = new Properties(); props.put("bootstrap.servers", "kms-2:9092,kms-3:9092,kms-4:9092"); props.put("group.id", "log_consumer"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset", "earliest");
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>( "user_access_logs", new SimpleStringSchema(), props);
DataStreamSource<String> logSource = senv.addSource(kafkaConsumer); DataStream<AccessLogRecord> availableAccessLog = LogAnalysis.getAvailableAccessLog(logSource); DataStream<Tuple4<String, String, Integer, Integer>> fieldFromLog = LogAnalysis.getFieldFromLog(availableAccessLog); tEnv.createTemporaryView("logs", fieldFromLog, $("clientIP"), $("accessDate"), $("sectionId"), $("articleId"), $("proctime").proctime());
LogAnalysis.getHotSection(tEnv); LogAnalysis.getHotArticle(tEnv); LogAnalysis.getClientAccess(tEnv); senv.execute("log-analysisi"); }
private static void getClientAccess(StreamTableEnvironment tEnv) { String client_ip_access_ddl = "" + "CREATE TABLE client_ip_access (\n" + " client_ip STRING ,\n" + " client_access_cnt BIGINT,\n" + " statistic_time STRING,\n" + " PRIMARY KEY (client_ip) NOT ENFORCED\n" + ")WITH (\n" + " 'connector' = 'jdbc',\n" + " 'url' = 'jdbc:mysql://kms-4:3306/statistics?useUnicode=true&characterEncoding=utf-8',\n" + " 'table-name' = 'client_ip_access', \n" + " 'driver' = 'com.mysql.jdbc.Driver',\n" + " 'username' = 'root',\n" + " 'password' = '123qwe'\n" + ") ";
tEnv.executeSql(client_ip_access_ddl);
String client_ip_access_sql = "" + "INSERT INTO client_ip_access\n" + "SELECT\n" + " clientIP,\n" + " count(1) AS access_cnt,\n" + " FROM_UNIXTIME(UNIX_TIMESTAMP()) AS statistic_time\n" + "FROM\n" + " logs \n" + "WHERE\n" + " articleId <> 0 \n" + " OR sectionId <> 0 \n" + "GROUP BY\n" + " clientIP " ; tEnv.executeSql(client_ip_access_sql);
}
private static void getHotArticle(StreamTableEnvironment tEnv) { String pre_forum_post_ddl = "" + "CREATE TABLE pre_forum_post (\n" + " tid INT,\n" + " subject STRING,\n" + " PRIMARY KEY (tid) NOT ENFORCED\n" + ") WITH (\n" + " 'connector' = 'jdbc',\n" + " 'url' = 'jdbc:mysql://kms-4:3306/ultrax',\n" + " 'table-name' = 'pre_forum_post', \n" + " 'driver' = 'com.mysql.jdbc.Driver',\n" + " 'username' = 'root',\n" + " 'password' = '123qwe'\n" + ")"; tEnv.executeSql(pre_forum_post_ddl); String hot_article_ddl = "" + "CREATE TABLE hot_article (\n" + " article_id INT,\n" + " subject STRING,\n" + " article_pv BIGINT ,\n" + " statistic_time STRING,\n" + " PRIMARY KEY (article_id) NOT ENFORCED\n" + ")WITH (\n" + " 'connector' = 'jdbc',\n" + " 'url' = 'jdbc:mysql://kms-4:3306/statistics?useUnicode=true&characterEncoding=utf-8',\n" + " 'table-name' = 'hot_article', \n" + " 'driver' = 'com.mysql.jdbc.Driver',\n" + " 'username' = 'root',\n" + " 'password' = '123qwe'\n" + ")"; tEnv.executeSql(hot_article_ddl); String hot_article_sql = "" + "INSERT INTO hot_article\n" + "SELECT \n" + " a.articleId,\n" + " b.subject,\n" + " count(1) as article_pv,\n" + " FROM_UNIXTIME(UNIX_TIMESTAMP()) AS statistic_time\n" + "FROM logs a \n" + " JOIN pre_forum_post FOR SYSTEM_TIME AS OF a.proctime as b ON a.articleId = b.tid\n" + "WHERE a.articleId <> 0\n" + "GROUP BY a.articleId,b.subject\n" + "ORDER BY count(1) desc\n" + "LIMIT 10";
tEnv.executeSql(hot_article_sql);
}
public static void getHotSection(StreamTableEnvironment tEnv) {
String pre_forum_forum_ddl = "" + "CREATE TABLE pre_forum_forum (\n" + " fid INT,\n" + " name STRING,\n" + " PRIMARY KEY (fid) NOT ENFORCED\n" + ") WITH (\n" + " 'connector' = 'jdbc',\n" + " 'url' = 'jdbc:mysql://kms-4:3306/ultrax',\n" + " 'table-name' = 'pre_forum_forum', \n" + " 'driver' = 'com.mysql.jdbc.Driver',\n" + " 'username' = 'root',\n" + " 'password' = '123qwe',\n" + " 'lookup.cache.ttl' = '10',\n" + " 'lookup.cache.max-rows' = '1000'" + ")"; tEnv.executeSql(pre_forum_forum_ddl);
String hot_section_ddl = "" + "CREATE TABLE hot_section (\n" + " section_id INT,\n" + " name STRING ,\n" + " section_pv BIGINT,\n" + " statistic_time STRING,\n" + " PRIMARY KEY (section_id) NOT ENFORCED \n" + ") WITH (\n" + " 'connector' = 'jdbc',\n" + " 'url' = 'jdbc:mysql://kms-4:3306/statistics?useUnicode=true&characterEncoding=utf-8',\n" + " 'table-name' = 'hot_section', \n" + " 'driver' = 'com.mysql.jdbc.Driver',\n" + " 'username' = 'root',\n" + " 'password' = '123qwe'\n" + ")";
tEnv.executeSql(hot_section_ddl);
String hot_section_sql = "" + "INSERT INTO hot_section\n" + "SELECT\n" + " a.sectionId,\n" + " b.name,\n" + " count(1) as section_pv,\n" + " FROM_UNIXTIME(UNIX_TIMESTAMP()) AS statistic_time \n" + "FROM\n" + " logs a\n" + " JOIN pre_forum_forum FOR SYSTEM_TIME AS OF a.proctime as b ON a.sectionId = b.fid \n" + "WHERE\n" + " a.sectionId <> 0 \n" + "GROUP BY a.sectionId, b.name\n" + "ORDER BY count(1) desc\n" + "LIMIT 10"; tEnv.executeSql(hot_section_sql);
}
public static DataStream<Tuple4<String, String, Integer, Integer>> getFieldFromLog(DataStream<AccessLogRecord> logRecord) { DataStream<Tuple4<String, String, Integer, Integer>> fieldFromLog = logRecord.map(new MapFunction<AccessLogRecord, Tuple4<String, String, Integer, Integer>>() { @Override public Tuple4<String, String, Integer, Integer> map(AccessLogRecord accessLogRecord) throws Exception { LogParse parse = new LogParse();
String clientIpAddress = accessLogRecord.getClientIpAddress(); String dateTime = accessLogRecord.getDateTime(); String request = accessLogRecord.getRequest(); String formatDate = parse.parseDateField(dateTime); Tuple2<String, String> sectionIdAndArticleId = parse.parseSectionIdAndArticleId(request); if (formatDate == "" || sectionIdAndArticleId == Tuple2.of("", "")) {
return new Tuple4<String, String, Integer, Integer>("0.0.0.0", "0000-00-00 00:00:00", 0, 0); } Integer sectionId = (sectionIdAndArticleId.f0 == "") ? 0 : Integer.parseInt(sectionIdAndArticleId.f0); Integer articleId = (sectionIdAndArticleId.f1 == "") ? 0 : Integer.parseInt(sectionIdAndArticleId.f1); return new Tuple4<>(clientIpAddress, formatDate, sectionId, articleId); } }); return fieldFromLog; }
public static DataStream<AccessLogRecord> getAvailableAccessLog(DataStream<String> accessLog) { final LogParse logParse = new LogParse(); DataStream<AccessLogRecord> filterDS = accessLog.map(new MapFunction<String, AccessLogRecord>() { @Override public AccessLogRecord map(String log) throws Exception { return logParse.parseRecord(log); } }).filter(new FilterFunction<AccessLogRecord>() { @Override public boolean filter(AccessLogRecord accessLogRecord) throws Exception { return !(accessLogRecord == null); } }).filter(new FilterFunction<AccessLogRecord>() { @Override public boolean filter(AccessLogRecord accessLogRecord) throws Exception { return !accessLogRecord.getHttpStatusCode().equals("200"); } }); return filterDS; } }
|