Skip to content

Commit

Permalink
[FLINK-26014][docs] Add documentation for how to use the working dire…
Browse files Browse the repository at this point in the history
…ctory on K8s

This closes apache#18808.
  • Loading branch information
tillrohrmann committed Feb 17, 2022
1 parent d893292 commit 965ca2d
Showing 1 changed file with 104 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,16 @@ To use Reactive Mode on Kubernetes, follow the same steps as for [deploying a jo

Once you have deployed the *Application Cluster*, you can scale your job up or down by changing the replica count in the `flink-taskmanager` deployment.

### Enabling Local Recovery Across Pod Restarts

In order to speed up recoveries in case of pod failures, you can leverage Flink's [working directory]({{< ref "docs/deployment/resource-providers/standalone/working_directory" >}}) feature together with local recovery.
If the working directory is configured to reside on a persistent volume that gets remounted to a restarted TaskManager pod, then Flink is able to recover state locally.
With the [StatefulSet](https://kubernetes.io/docs/concepts/workloads/controllers/statefulset/), Kubernetes gives you the exact tool you need to map a pod to a persistent volume.
Deploying TaskManagers as a StatefulSet, allows you to configure a volume claim template that is used to mount persistent volumes to the TaskManagers.
Additionally, you need to configure a deterministic `taskmanager.resource-id`.
A suitable value is the [pod name](https://kubernetes.io/docs/tasks/inject-data-application/environment-variable-expose-pod-information/), that you expose using environment variables.
For an example StatefulSet configuration take a look at the [appendix](#local-recovery-enabled-taskmanager-statefulset).
{{< top >}}
Expand Down Expand Up @@ -771,4 +781,98 @@ spec:
path: /host/path/to/job/artifacts
```
### Local Recovery Enabled TaskManager StatefulSet
```yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: flink-config
labels:
app: flink
data:
flink-conf.yaml: |+
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 2
blob.server.port: 6124
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
state.backend.local-recovery: true
process.taskmanager.working-dir: /pv
---
apiVersion: v1
kind: Service
metadata:
name: taskmanager-hl
spec:
clusterIP: None
selector:
app: flink
component: taskmanager
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: flink-taskmanager
spec:
serviceName: taskmanager-hl
replicas: 2
selector:
matchLabels:
app: flink
component: taskmanager
template:
metadata:
labels:
app: flink
component: taskmanager
spec:
securityContext:
runAsUser: 9999
fsGroup: 9999
containers:
- name: taskmanager
image: apache/flink:{{< stable >}}{{< version >}}-scala{{< scala_version >}}{{< /stable >}}{{< unstable >}}latest{{< /unstable >}}
env:
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
args: ["taskmanager", "-Dtaskmanager.resource-id=$(POD_NAME)"]
ports:
- containerPort: 6122
name: rpc
- containerPort: 6125
name: query-state
- containerPort: 6121
name: metrics
livenessProbe:
tcpSocket:
port: 6122
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf/
- name: pv
mountPath: /pv
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
volumeClaimTemplates:
- metadata:
name: pv
spec:
accessModes: [ "ReadWriteOnce" ]
resources:
requests:
storage: 50Gi
```
{{< top >}}

0 comments on commit 965ca2d

Please sign in to comment.