Spark を活用する:ビッグデータアプリケーション用の高速インメモリコンピューティング

Clouderaのプライベートトレーニング

投稿日: 2014/03/07

 

本記事は、弊社パートナーであるDatabricks様による寄稿記事を翻訳したものです。原文についてはこちらをご覧ください。

 


 

Apache Spark (incubating)をサポートするDatabricksに、以下のゲスト記事を寄稿していただきました。ClouderaとDatabricksはCDH内でSparkを提供しサポートすると最近発表しました。今後、Sparkの内部アーキテクチャやSpark + CDHの使い方などをブログ記事で紹介する予定です。

Apache Hadoop は、非常に低コストで膨大な量のデータを格納し処理できるようにして、ビッグデータ処理に革命をもたらしました。MapReduceは、システムログの解析から、ETLの実行、Webインデックスの作成、そしてパーソナルリコメンデーションシステムの実行基盤に至るまで、複雑なバッチアプリケーションを実装するための理想的なプラットフォームであることは既に証明されています。しかし、耐障害性とワンパス計算モデルを提供するための永続的ストレージへの依存により、MapReduceは低レイテンシのアプリケーションや、機械学習やグラフアルゴリズムなどで使う繰り返しの計算処理には適しておりません。

Apache Sparkは劇的にパフォーマンスと使いやすさを向上させながら、MapReduceの計算モデルを一般化し、これらの制限に対処しています。

 

Sparkで迅速かつ簡単にビッグデータ処理

基本的にSparkはMapper、Reducer、JOIN、GROUP BY、フィルタなどの任意の演算子によってアプリケーションを書けるようにするプログラミングモデルです。この組み合わせにより、繰り返しの機械学習、ストリーミング、複雑なクエリ、そしてバッチなど幅広い領域を表現するのが簡単になります。

また、Sparkは、演算子の各々が生成するデータを追跡し、 アプリケーションが確実メモリ内にこのデータを保存できるようにします。これによりアプリケーションは高コストなディスクアクセスを避けることができますので、これがSparkの性能の鍵となります。下の図に示すように、この機能により以下のことが可能になります。

・メモリ内の作業データセットをキャッシュし、メモリの速度で計算を行うことで、 低レイテンシの計算 、および
・後続の繰り返し処理の共有データをメモリ上に保持するか、繰り返し同じデータセットにアクセスすることによる、効率的な反復アルゴリズム

 

 

Sparkの使いやすさは、ユーザーがたくさんのMapとReduce処理に縛られずにアプリケーションを構築できるというその一般的なプログラミングモデルから来ています。Sparkの並列プログラムは逐次プログラムに非常によく似ていて、それにより開発とレビューを簡単にすることができます。最後に、Sparkによりユーザーは単一のアプリケーションでバッチ処理、インタラクティブ処理、ストリーミングジョブを簡単に組み合わせることができます。その結果、SparkジョブはHadoopジョブに比べて1/2から1/10のコード量で最大で100倍もの速度で実行することができます。

 

高度なデータ解析とデータサイエンスのためのSparkの利用

 

インタラクティブなデータ解析

Sparkの最も便利な機能の一つはインタラクティブシェルです。これによりユーザーはすぐにSparkの性能を試すことができます。IDEもコードコンパイルも必要ありません。シェルは、データをインタラクティブに探索したり、開発中のアプリケーションの一部をテストしたりするための主要なツールとして使うことができます。

下のスクリーンショットは、ユーザーがファイルをロードし、「Holiday」を含む行の数をカウントするSpark Pythonのシェルを示しています。

 

 

この例に示すように、SparkはHDFSからのデータを読み書きすることができます。このように、Hadoopの利用者はSparkをインストールしたらすぐにHDFSのデータ分析を開始することができます。そして、メモリ内のデータセットをキャッシュすることで、ユーザーが対話的に様々な種類の複雑な計算を行うことができます!

Sparkは、スタンドアロンアプリケーションのためのScalaのシェル、およびJava、Scala、およびPythonでAPIを提供します。

 

より高速なバッチ

Sparkの最初期の実装の中には、既存のMapReduceアプリケーションのパフォーマンスを改善する方法に焦点を当てているものもありました。MapReduceは実際には汎用的な処理フレームワークであり、コアHadoopで中で最もよく知られた実装だけに限らないことを忘れないでください。SparkもMapReduceを提供し、そしてSparkはメモリを効率よく使うことができますので(必要があれば障害から復旧するための系統を使うとしても)、繰り返し処理のプログラムでキャッシュを使わなくても、SparkのMapReduceの実装のいくつかはHadoopのMapReduceと比較して単純に高速です。

以下の例は、MapReduceの最も有名な例であるワードカウントのSpark実装を示しています。ここで、Sparkは演算子チェーンをサポートしていることがわかります。これは、データのフィルタリングを複雑なMapReduceジョブの実行前に行うなどの、データの前処理や後処理を行うときに非常に便利です。

 

val file = sc.textFile("hdfs://.../pagecounts-*.gz")
val counts = file.flatMap(line => line.split(" "))
                   .map(word => (word, 1))
                   .reduceByKey(_ + _)
counts.saveAsTextFile("hdfs://.../word-count")

 

Sparkのバッチ処理の能力は、現実世界のシナリオにおいて証明されています。ある非常に巨大なシリコンバレーのインターネット企業はモデルのトレーニングパイプラインにおける特徴量抽出を実装した1つのMRジョブをそのまま移植しただけで3倍の速度改善を図ることができました。

 

反復アルゴリズム

Sparkは、ユーザーやアプリケーションが明示的にcache()オペレーションを呼び出すことにより、データセットをキャッシュすることができます。これはアプリケーションがディスクではなくRAMからデータにアクセスできるということを意味していて、これにより同じデータセットに繰り返しアクセスする反復アルゴリズムの性能を劇的に改善することができます。すべての機械学習とグラフアルゴリズムは、本質的に反復的であるように、このユースケースは、アプリケーションの重要な領域をカバーしています。

世界最大級のインターネット企業の2社が、広告ターゲティングとコンテンツリコメンデーションを提供するためにSparkによる効率的な反復実行を活用しています。ロジスティック回帰のような機械学習アルゴリズムは従来のHadoopベースの実装に比べて100倍高速に実行できます(右の図をご覧ください)。一方協調フィルタリングやADMM(alternating direction method of multipliers) のようなアルゴリズムでは15倍の高速化にとどまりました。

以下の例は、多次元特徴空間内の点の二組を分離する最良な超平面を見つけるためにロジスティック回帰を使用しています。MapReduceの中で、各イテレーションではディスクからデータを読み込んで巨大なオーバーヘッドが発生している一方、Sparkではキャッシュされたデータセット「points」がメモリから繰り返しアクセスされている点に注意してください。

 

val points = sc.textFile("...").map(parsePoint).cache()
var w = Vector.random(D) //current separating plane
for (i <- 1 to ITERATIONS) {
    val gradient = points.map(p =>
      (1 / (1 + exp(-p.y*(w dot p.x))) - 1) * p.y * p.x
    ).reduce(_ + _)
    w -= gradient
  }
  println("Final separating plane: " + w)

 

リアルタイムのストリーム処理

自由に使える低レイテンシなデータ解析システムが手元にあれば、そのエンジンをライブデータストリーム処理に拡張するのは当然のことです。Sparkにはストリーム操作のためのAPIがあり、正確に1回きりのセマンティクスとステートフルな演算子の完全復旧を提供します。ストリームの処理に同じSpark APIを提供していて、通常のSparkアプリケーションコードを再利用できるという明確な利点もあります。

以下のコードは、ハッシュタグで始まる単語のワードカウントを10秒ごとのデータに対してフィルタ処理するという、シンプルなネットワークストリーム処理のジョブを示しています。前のワードカウントの例と比較してみれば、ほぼ全く同じ同じコードが使用されていることがわかりますが、今回はライブデータストリームを処理しています。

 

val ssc = new StreamingContext(args(0), "NetworkHashCount", Seconds(10),
      System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))

val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" ")).filter(_.startsWith("#"))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()

 

Spark Streaming APIがリリースされてからまだ1年経っていませんが、ユーザーはそのAPIを使ったアプリケーションを本番環境に導入することでシステムログを集約したステートフルなデータに対し監視とアラートを提供し、わずか数秒のレイテンシのみで非常に高速な処理を達成しています。

 

意思決定の迅速化

多くの企業は、推薦システム、広告のターゲット設定、または予測分析の形で、ユーザーの意思決定あるいは意思決定の促進のために、ビッグデータを使用しています。どんな意思決定においても重要な要素は、レイテンシです。すなわち、入力データが利用可能になってから意思決定を行うまでにかかる時間のことです。意思決定の待ち時間を減らすことは大幅にその有効性を高め、最終的には企業の投資対効果(ROI)を増やすことができます。これらの決定の多くは(例えば、機械学習や統計的アルゴリズムなどの)複雑な計算に基づいているので、Sparkが意思決定をスピードアップするために理想的であることがわかります。

当然のことながら、Sparkは、待ち時間を低減するためだけでなく、決定の質を向上させるために利用されていいます。例えば、広告ターゲティングから、インターネット上での映像配信の品質を向上させるまで幅広い範囲で適用されています。

 

統一されたパイプライン

今日のビッグデータの多くはMapReduceのみならず、ストリーミング、バッチ、インタラクティブ処理のフレームワークを統合することによって展開されています。多くのシステムをSparkと置き換えることによって、ユーザーはデータ処理パイプラインの複雑さを劇的に低減させることができます。

例えば、今日、多くの企業はMapReduceを使ってレポートを生成して履歴クエリに対して回答し、別のシステムを構築してリアルタイムに重要なメトリクスを追跡するためのストリーム処理を行ったりしています。この方法は、2つの異なる計算モデルのアプリケーションを開発するだけでなく、2つの異なるシステムの維持管理も必要とします。また、2つのスタックで生成される結果に一貫性があることを保証する必要性も出てくるでしょう(例えば、ストリーミングアプリケーションとMapReduceによって計算されたカウンタが同じであるかどうか)。

最近では、ユーザーが履歴レポートを提供するためのバッチ処理だけでなく、ストリーム処理を実装するためにSparkを利用しています。これによりデプロイと保守を簡素化するだけでなく、アプリケーション開発を劇的にシンプルにすることができます。同じコードで処理しているのであれば、リアルタイムと履歴のメトリクスの一貫性を維持することは問題にはなりません。 統一化の最後の利点は、異なるシステム間でのデータ移動が不要になることによる性能向上です。一旦メモリ上に保存されれば、そのデータはストリーミング処理と履歴(あるいはインタラクティブな)クエリとの間で共有することができます。

 

あなたのターン: 始めましょう

Sparkによってパワフルなビッグデータアプリケーションを作ることが非常に簡単になります。あなたが既に持っているHadoopとプログラミングスキルにより、数分で生産的にデータを活用することができるでしょう。やるなら今です。

ダウンロード: http://spark.incubator.apache.org/downloads.html
クイックスタート: http://spark.incubator.apache.org/docs/latest/quick-start.html

 

関連記事

Apache Spark (製品紹介ページ)

CDH5とCloudera Manager 5がリリースされました!

ImpalaとHiveの戦略について

 

 


Contact us

製品やサービス、サポート、トレーニングについてのより詳しい情報は、下記までお問い合わせください。

Cloudera全般(日本語)
info-jp@cloudera.com
二ユースレター購読 (日本語)
Clouderaからの日本語での二ユースレター購読希望の方は
info-jp@cloudera.com
件名: ML_SUBSCRIBE
でメールをお送りください