amazons3-apacheflinkaws3sink是否需要Hadoop进行 local test?


0

我对apacheflink还比较陌生,我正在尝试创建一个简单的项目,该项目生成一个文件到aws3存储桶。 root据文档,看起来我需要安装Hadoop才能做到这一点。

如何设置 local环境以允许我 test此功能?我已经在 local安装了apacheflink和Hadoop。我已经给 core添加了必要的更改- site.xml配置Hadoop,并将我的Hadoop_CONF路径添加到 Flink·亚姆配置。当我试图通过flinkui在 local提交我的 job时,我总是得到一个错误

2016-12-29 16:03:49861信息org.apache.flink网站.util.NetUtils工具-由于错误:地址已在使用中,无法在端口6123上分配

我假设我的环境是如何建立起来的,我觉得我遗漏了一些东西。有可能在 local进行吗?任何帮助都将不胜感激。

2 答案


0

虽然您需要Hadoop库,但不必安装Hadoop才能在 local运行并写入S3。我只是碰巧通过编写一个基于Avro模式的Parquet输出并将SpecificRecord生成到S3中进行了尝试。我正在通过SBT和intellijidea在 local运行以下代码的一个版本。所需零件:

1) 让下面的文件指定所需的Hadoop属性(注意:不建议定义AWS访问密钥/密钥)。最好在EC2实例上运行,该实例具有适当的IAM角色来读/写s3bucket。但需要 local test)

<configuration>
    <property>
        <name>fs.s3.impl</name>
        <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
    </property>

&lt;!-- Comma separated list of local directories used to buffer
     large results prior to transmitting them to S3. --&gt;
&lt;property&gt;
    &lt;name&gt;fs.s3a.buffer.dir&lt;/name&gt;
    &lt;value&gt;/tmp&lt;/value&gt;
&lt;/property&gt;

&lt;!-- set your AWS ID using key defined in org.apache.hadoop.fs.s3a.Constants --&gt;
&lt;property&gt;
    &lt;name&gt;fs.s3a.access.key&lt;/name&gt;
    &lt;value&gt;YOUR_ACCESS_KEY&lt;/value&gt;
&lt;/property&gt;

&lt;!-- set your AWS access key --&gt;
&lt;property&gt;
    &lt;name&gt;fs.s3a.secret.key&lt;/name&gt;
    &lt;value&gt;YOUR_SECRET_KEY&lt;/value&gt;
&lt;/property&gt;

</configuration>

2) 进口:

import org.apache.flink.api.scala.hadoop.mapreduce.HadoopOutputFormat
import org.apache.flink.api.scala.{ExecutionEnvironment, _}

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.conf.{Configuration => HadoopConfiguration}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.Job

import org.apache.parquet.avro.AvroParquetOutputFormat

3) Flink代码使用HadoopOutputFormat和上述配置:

    val events: DataSet[(Void, EventOnlyRecord)] = ...

val hadoopConfig = getHadoopConfiguration(hadoopConfigFile)

val outputFormat = new AvroParquetOutputFormat[EventOnlyRecord]
val outputJob = Job.getInstance

//Note: AvroParquetOutputFormat extends FileOutputFormat[Void,T]
//so key is Void, value of type T - EventOnlyRecord in this case
val hadoopOutputFormat = new HadoopOutputFormat[Void, EventOnlyRecord](
  outputFormat,
  outputJob
)

val outputConfig = outputJob.getConfiguration
outputConfig.addResource(hadoopConfig)
val outputPath = new Path("s3://&lt;bucket&gt;/&lt;dir-prefix&gt;")
FileOutputFormat.setOutputPath(outputJob, outputPath)
AvroParquetOutputFormat.setSchema(outputJob, EventOnlyRecord.getClassSchema)

events.output(hadoopOutputFormat)

env.execute

...

def getHadoopConfiguration(hadoodConfigPath: String): HadoopConfiguration = {
  val hadoopConfig = new HadoopConfiguration()
  hadoopConfig.addResource(new Path(hadoodConfigPath))
  hadoopConfig
}

4) 生成依赖项和使用的版本:

    val awsSdkVersion = "1.7.4"
    val hadoopVersion = "2.7.3"
    val flinkVersion = "1.1.4"

val flinkDependencies = Seq(
  ("org.apache.flink" %% "flink-scala" % flinkVersion),
  ("org.apache.flink" %% "flink-hadoop-compatibility" % flinkVersion)
)

val providedFlinkDependencies = flinkDependencies.map(_ % "provided")

val serializationDependencies = Seq(
  ("org.apache.avro" % "avro" % "1.7.7"),
  ("org.apache.avro" % "avro-mapred" % "1.7.7").classifier("hadoop2"),
  ("org.apache.parquet" % "parquet-avro" % "1.8.1")
)

val s3Dependencies = Seq(
  ("com.amazonaws" % "aws-java-sdk" % awsSdkVersion),
  ("org.apache.hadoop" % "hadoop-aws" % hadoopVersion)
)

对S3使用WriteText的编辑:

1) 创建一个具有文件 core的Hadoop配置目录(将其称为Hadoop conf dir)- site.xml在里面。

例如:

mkdir /home/<user>/hadoop-config
cd /home/<user>/hadoop-config
vi core-site.xml

#content of core-site.xml
<configuration>
<property>
<name>fs.s3.impl</name>
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
</property>

&lt;!-- Comma separated list of local directories used to buffer
     large results prior to transmitting them to S3. --&gt;
&lt;property&gt;
    &lt;name&gt;fs.s3a.buffer.dir&lt;/name&gt;
    &lt;value&gt;/tmp&lt;/value&gt;
&lt;/property&gt;

&lt;!-- set your AWS ID using key defined in org.apache.hadoop.fs.s3a.Constants --&gt;
&lt;property&gt;
    &lt;name&gt;fs.s3a.access.key&lt;/name&gt;
    &lt;value&gt;YOUR_ACCESS_KEY&lt;/value&gt;
&lt;/property&gt;

&lt;!-- set your AWS access key --&gt;
&lt;property&gt;
    &lt;name&gt;fs.s3a.secret.key&lt;/name&gt;
    &lt;value&gt;YOUR_SECRET_KEY&lt;/value&gt;
&lt;/property&gt;

</configuration>

2) 用flink文件创建一个目录(将其称为flink conf dir)-确认.yaml在里面。

例如:

mkdir /home/<user>/flink-config
cd /home/<user>/flink-config
vi flink-conf.yaml

//content of flink-conf.yaml - continuing earlier example
fs.hdfs.hadoopconf: /home/<user>/hadoop-config

3) 编辑用于运行S3 Flink job的IntelliJ运行配置-运行-编辑配置-并添加以下环境变量:

FLINK_CONF_DIR and set it to your flink-conf-dir

Continuing the example above:
FLINK_CONF_DIR=/home/<user>/flink-config

4) 使用该环境变量集运行代码:

events.writeAsText("s3://<bucket>/<prefix-dir>")

env.execute


0

为了在 local运行我的flink job,我必须执行以下操作:

1-将flink-s3-fs-hadoop-1.9.1.jar添加到我的flink/plugins/flink-s3-fs-hadoop目录

2-修改的flink/conf/flink-确认.yaml包括

我有 core- site.xml文件在hadoop config文件夹中,但它不包含任何配置,因此fs.hdfs.hadoopconf可能不需要。


我来回答

写文章

提问题

面试题