forked from GoogleCloudPlatform/DataflowTemplates
-
Notifications
You must be signed in to change notification settings - Fork 0
/
TextImportPipeline.java
193 lines (159 loc) · 7.66 KB
/
TextImportPipeline.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
/*
* Copyright (C) 2019 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.spanner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
/**
* Text files to Cloud Spanner Import pipeline. This pipeline ingests CSV and other type of
* delimited data from GCS and writes data to a Cloud Spanner database table. Each row from the
* input CSV file will be applied to Cloudd Spanner with an InsertOrUpdate mutation, so this can be
* used both to populate new rows or to update columns of existing rows.
*
* <p>You can specify column delimiter other than comma. Also make sure to use field qualifier such
* as double quote to escape delimiter if it is in the value.
*
* <p>Text file must NOT have a header.
*
* <p>Example Usage: Here is CSV sample data simulating an account table:
* 1,sample_user_1,true,2018-01-01,2018-01-01T12:30:00Z
*
* <p>Schema file must have all column and type definition in one line. Schema file must use the
* data type names of Cloud Spanner. We currently support the following Cloud Spanner data types: -
* BOOL - DATE - FLOAT64 - INT64 - STRING - TIMESTAMP
*
* <p>Input format properties: - \\N in the source column will be considered as NULL value when
* writing to Cloud Spanner. - If you need to escape characters, you can use the "fieldQualifier"
* parameter to tell the pipeline. e.g. You can put all values inside double quotes like "123",
* "john", "true" - See the implementation of parseRow() below to see what values are accepted for
* each data type.
*
* <p>NOTE: BYTES, ARRAY, STRUCT types are not supported.
*
* <p>Example schema file for the CSV file above:
*
* <pre>Id:INT64,Username:STRING,Active:BOOL,CreateDate:DATE,ModifyTime:TIMESTAMP</pre>
*
* <p>Here is the DDL for creating Cloud Spanner table:
*
* <pre>CREATE TABLE example_table
* ( Id INT64, Username STRING(MAX), Active BOOL, CreateDate DATE, ModifyTime TIMESTAMP )
* PRIMARY KEY(Id)
* </pre>
*
* <pre>
* {@code mvn compile exec:java \
* -Dexec.mainClass=com.google.cloud.teleport.templates.TextImportPipeline \
* -Dexec.args=" \
* --project=${PROJECT_ID} \
* --stagingLocation=gs://${PROJECT_ID}/dataflow/pipelines/${PIPELINE_FOLDER}/staging \
* --tempLocation=gs://${PROJECT_ID}/dataflow/pipelines/${PIPELINE_FOLDER}/temp \
* --runner=DataflowRunner \
* --importManifest=gs://IMPORT_MANIFEST_FILE \
* --spannerInstance=SPANNER_INSTANCE_NAME \
* --databaseName=DATABASE_NAME \
* --tableName=TABLE_NAME \
* --columnDelimiter=',' \
* --fieldQualifier='"'
* }
* </pre>
*/
public class TextImportPipeline {
/** Options for {@link TextImportPipeline}. */
public interface Options extends PipelineOptions {
@Description("Instance ID to write to Spanner")
ValueProvider<String> getInstanceId();
void setInstanceId(ValueProvider<String> value);
@Description("Database ID to write to Spanner")
ValueProvider<String> getDatabaseId();
void setDatabaseId(ValueProvider<String> value);
@Description("Spanner host. The default value is https://batch-spanner.googleapis.com.")
@Default.String("https://batch-spanner.googleapis.com")
ValueProvider<String> getSpannerHost();
void setSpannerHost(ValueProvider<String> value);
@Description("Text Import Manifest file, storing a json-encoded {@link importManifest} object.")
ValueProvider<String> getImportManifest();
void setImportManifest(ValueProvider<String> value);
@Description("Column delimiter of the data files. The default value is comma.")
@Default.Character(',')
ValueProvider<Character> getColumnDelimiter();
void setColumnDelimiter(ValueProvider<Character> value);
@Description(
"Field qualifier used by the source file. Field qualifier should be used when character"
+ " needs to be escaped. The default value is double quote.")
@Default.Character('"')
ValueProvider<Character> getFieldQualifier();
void setFieldQualifier(ValueProvider<Character> value);
@Description("If true, the lines has trailing delimiters. The default value is true.")
@Default.Boolean(true)
ValueProvider<Boolean> getTrailingDelimiter();
void setTrailingDelimiter(ValueProvider<Boolean> value);
@Description(
"The escape character. The default value is NULL (not using the escape character).")
ValueProvider<Character> getEscape();
void setEscape(ValueProvider<Character> value);
@Description(
"The string that represents the NULL value. The default value is null (not using the null"
+ " string).")
ValueProvider<String> getNullString();
void setNullString(ValueProvider<String> value);
@Description(
"The format used to parse date columns. By default, the pipeline will try to parse the"
+ " date columns as \"yyyy-MM-dd[' 00:00:00']\" (e.g., 2019-01-31, or 2019-01-31"
+ " 00:00:00). If your data format is different, please specify the format using the"
+ " {@link DateTimeFormatter} patterns.")
ValueProvider<String> getDateFormat();
void setDateFormat(ValueProvider<String> value);
@Description(
"The format used to parse timestamp columns. If the timestamp is a long integer, then it's"
+ " treated as Unix epoch (the microsecond since 1970-01-01T00:00:00.000Z. Otherwise,"
+ " it parsed as a string using the {@link DateTimeFormatter#ISO_INSTANT} format. For"
+ " other cases, please specify you own pattern string, e.g., \"MMM dd yyyy"
+ " HH:mm:ss.SSSVV\" for timestamp in the form of \"Jan 21 1998 01:02:03.456+08:00\"."
+ " Please refer to {@link DateTimeFormatter} for more details.")
ValueProvider<String> getTimestampFormat();
void setTimestampFormat(ValueProvider<String> value);
@Description("If true, wait for job finish. The default value is true.")
@Default.Boolean(true)
boolean getWaitUntilFinish();
void setWaitUntilFinish(boolean value);
}
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
Pipeline p = Pipeline.create(options);
SpannerConfig spannerConfig =
SpannerConfig.create()
.withHost(options.getSpannerHost())
.withInstanceId(options.getInstanceId())
.withDatabaseId(options.getDatabaseId());
p.apply(new TextImportTransform(spannerConfig, options.getImportManifest()));
PipelineResult result = p.run();
if (options.getWaitUntilFinish()
&&
/* Only if template location is null, there is a dataflow job to wait for. Otherwise it's
* template generation, which doesn't start a dataflow job.
*/
options.as(DataflowPipelineOptions.class).getTemplateLocation() == null) {
result.waitUntilFinish();
}
}
}