Skip to content

Commit

Permalink
Merge pull request seaweedfs#4 from chrislusf/master
Browse files Browse the repository at this point in the history
sync
  • Loading branch information
hilimd committed Jul 23, 2020
2 parents 6ea4ce7 + 6f058b3 commit 437d187
Show file tree
Hide file tree
Showing 19 changed files with 798 additions and 312 deletions.
2 changes: 1 addition & 1 deletion k8s/seaweedfs/Chart.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
apiVersion: v1
description: SeaweedFS
name: seaweedfs
version: 1.85
version: 1.86
2 changes: 1 addition & 1 deletion k8s/seaweedfs/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ global:
registry: ""
repository: ""
imageName: chrislusf/seaweedfs
imageTag: "1.85"
imageTag: "1.86"
imagePullPolicy: IfNotPresent
imagePullSecrets: imagepullsecret
restartPolicy: Always
Expand Down
2 changes: 1 addition & 1 deletion other/java/client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-client</artifactId>
<version>1.3.9</version>
<version>1.4.1</version>

<parent>
<groupId>org.sonatype.oss</groupId>
Expand Down
2 changes: 1 addition & 1 deletion other/java/client/pom.xml.deploy
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-client</artifactId>
<version>1.3.9</version>
<version>1.4.1</version>

<parent>
<groupId>org.sonatype.oss</groupId>
Expand Down
2 changes: 1 addition & 1 deletion other/java/client/pom_debug.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-client</artifactId>
<version>1.3.9</version>
<version>1.4.1</version>

<parent>
<groupId>org.sonatype.oss</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public static synchronized ByteBuffer request(int bufferSize) {
if (bufferSize < MIN_BUFFER_SIZE) {
bufferSize = MIN_BUFFER_SIZE;
}
LOG.debug("requested new buffer {}", bufferSize);
if (bufferList.isEmpty()) {
return ByteBuffer.allocate(bufferSize);
}
Expand All @@ -33,6 +34,7 @@ public static synchronized ByteBuffer request(int bufferSize) {
}

public static synchronized void release(ByteBuffer obj) {
obj.clear();
bufferList.add(0, obj);
}

Expand Down
14 changes: 10 additions & 4 deletions other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import org.apache.http.entity.mime.HttpMultipartMode;
import org.apache.http.entity.mime.MultipartEntityBuilder;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.io.IOException;
Expand All @@ -15,6 +17,8 @@

public class SeaweedWrite {

private static final Logger LOG = LoggerFactory.getLogger(SeaweedWrite.class);

private static final SecureRandom random = new SecureRandom();

public static void writeData(FilerProto.Entry.Builder entry,
Expand All @@ -23,8 +27,10 @@ public static void writeData(FilerProto.Entry.Builder entry,
final long offset,
final byte[] bytes,
final long bytesOffset, final long bytesLength) throws IOException {
FilerProto.FileChunk.Builder chunkBuilder = writeChunk(
replication, filerGrpcClient, offset, bytes, bytesOffset, bytesLength);
synchronized (entry) {
entry.addChunks(writeChunk(replication, filerGrpcClient, offset, bytes, bytesOffset, bytesLength));
entry.addChunks(chunkBuilder);
}
}

Expand Down Expand Up @@ -58,6 +64,8 @@ public static FilerProto.FileChunk.Builder writeChunk(final String replication,
// cache fileId ~ bytes
SeaweedRead.chunkCache.setChunk(fileId, bytes);

LOG.debug("write file chunk {} size {}", targetUrl, bytesLength);

return FilerProto.FileChunk.newBuilder()
.setFileId(fileId)
.setOffset(offset)
Expand All @@ -71,10 +79,8 @@ public static void writeMeta(final FilerGrpcClient filerGrpcClient,
final String parentDirectory,
final FilerProto.Entry.Builder entry) throws IOException {

int chunkSize = entry.getChunksCount();
List<FilerProto.FileChunk> chunks = FileChunkManifest.maybeManifestize(filerGrpcClient, entry.getChunksList());

synchronized (entry) {
List<FilerProto.FileChunk> chunks = FileChunkManifest.maybeManifestize(filerGrpcClient, entry.getChunksList());
entry.clearChunks();
entry.addAllChunks(chunks);
filerGrpcClient.getBlockingStub().createEntry(
Expand Down
2 changes: 1 addition & 1 deletion other/java/hdfs2/dependency-reduced-pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@
</snapshotRepository>
</distributionManagement>
<properties>
<seaweedfs.client.version>1.3.9</seaweedfs.client.version>
<seaweedfs.client.version>1.4.1</seaweedfs.client.version>
<hadoop.version>2.9.2</hadoop.version>
</properties>
</project>
2 changes: 1 addition & 1 deletion other/java/hdfs2/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>

<properties>
<seaweedfs.client.version>1.3.9</seaweedfs.client.version>
<seaweedfs.client.version>1.4.1</seaweedfs.client.version>
<hadoop.version>2.9.2</hadoop.version>
</properties>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,6 @@ private synchronized int submitWriteBufferToService(final ByteBuffer bufferToWri
// System.out.println(path + " is going to save [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")");
SeaweedWrite.writeData(entry, replication, filerGrpcClient, writePosition, bufferToWrite.array(), bufferToWrite.position(), bufferToWrite.limit());
// System.out.println(path + " saved [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")");
bufferToWrite.clear();
ByteBufferPool.release(bufferToWrite);
return null;
});
Expand Down
2 changes: 1 addition & 1 deletion other/java/hdfs3/dependency-reduced-pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@
</snapshotRepository>
</distributionManagement>
<properties>
<seaweedfs.client.version>1.3.9</seaweedfs.client.version>
<seaweedfs.client.version>1.4.1</seaweedfs.client.version>
<hadoop.version>3.1.1</hadoop.version>
</properties>
</project>
2 changes: 1 addition & 1 deletion other/java/hdfs3/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>

<properties>
<seaweedfs.client.version>1.3.9</seaweedfs.client.version>
<seaweedfs.client.version>1.4.1</seaweedfs.client.version>
<hadoop.version>3.1.1</hadoop.version>
</properties>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,6 @@ private synchronized int submitWriteBufferToService(final ByteBuffer bufferToWri
// System.out.println(path + " is going to save [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")");
SeaweedWrite.writeData(entry, replication, filerGrpcClient, writePosition, bufferToWrite.array(), bufferToWrite.position(), bufferToWrite.limit());
// System.out.println(path + " saved [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")");
bufferToWrite.clear();
ByteBufferPool.release(bufferToWrite);
return null;
});
Expand Down
194 changes: 194 additions & 0 deletions unmaintained/diff_volume_servers/diff_volume_servers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
package main

import (
"bytes"
"context"
"errors"
"flag"
"fmt"
"io"
"math"
"os"
"strings"

"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/storage/idx"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/util"
"google.golang.org/grpc"
)

var (
serversStr = flag.String("volumeServers", "", "comma-delimited list of volume servers to diff the volume against")
volumeId = flag.Int("volumeId", -1, "a volume id to diff from servers")
volumeCollection = flag.String("collection", "", "the volume collection name")
grpcDialOption grpc.DialOption
)

/*
Diff the volume's files across multiple volume servers.
diff_volume_servers -volumeServers 127.0.0.1:8080,127.0.0.1:8081 -volumeId 5
Example Output:
reference 127.0.0.1:8081
fileId volumeServer message
5,01617c3f61 127.0.0.1:8080 wrongSize
*/
func main() {
flag.Parse()

util.LoadConfiguration("security", false)
grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")

vid := uint32(*volumeId)
servers := strings.Split(*serversStr, ",")
if len(servers) < 2 {
glog.Fatalf("You must specify more than 1 server\n")
}
var referenceServer string
var maxOffset int64
allFiles := map[string]map[types.NeedleId]needleState{}
for _, addr := range servers {
files, offset, err := getVolumeFiles(vid, addr)
if err != nil {
glog.Fatalf("Failed to copy idx from volume server %s\n", err)
}
allFiles[addr] = files
if offset > maxOffset {
referenceServer = addr
}
}

same := true
fmt.Println("reference", referenceServer)
fmt.Println("fileId volumeServer message")
for nid, n := range allFiles[referenceServer] {
for addr, files := range allFiles {
if addr == referenceServer {
continue
}
var diffMsg string
n2, ok := files[nid]
if !ok {
if n.state == stateDeleted {
continue
}
diffMsg = "missing"
} else if n2.state != n.state {
switch n.state {
case stateDeleted:
diffMsg = "notDeleted"
case statePresent:
diffMsg = "deleted"
}
} else if n2.size != n.size {
diffMsg = "wrongSize"
} else {
continue
}
same = false

// fetch the needle details
var id string
var err error
if n.state == statePresent {
id, err = getNeedleFileId(vid, nid, referenceServer)
} else {
id, err = getNeedleFileId(vid, nid, addr)
}
if err != nil {
glog.Fatalf("Failed to get needle info %d from volume server %s\n", nid, err)
}
fmt.Println(id, addr, diffMsg)
}
}
if !same {
os.Exit(1)
}
}

const (
stateDeleted uint8 = 1
statePresent uint8 = 2
)

type needleState struct {
state uint8
size uint32
}

func getVolumeFiles(v uint32, addr string) (map[types.NeedleId]needleState, int64, error) {
var idxFile *bytes.Reader
err := operation.WithVolumeServerClient(addr, grpcDialOption, func(vs volume_server_pb.VolumeServerClient) error {
copyFileClient, err := vs.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{
VolumeId: v,
Ext: ".idx",
CompactionRevision: math.MaxUint32,
StopOffset: math.MaxInt64,
Collection: *volumeCollection,
})
if err != nil {
return err
}
var buf bytes.Buffer
for {
resp, err := copyFileClient.Recv()
if errors.Is(err, io.EOF) {
break
}
if err != nil {
return err
}
buf.Write(resp.FileContent)
}
idxFile = bytes.NewReader(buf.Bytes())
return nil
})
if err != nil {
return nil, 0, err
}

var maxOffset int64
files := map[types.NeedleId]needleState{}
err = idx.WalkIndexFile(idxFile, func(key types.NeedleId, offset types.Offset, size uint32) error {
if offset.IsZero() || size == types.TombstoneFileSize {
files[key] = needleState{
state: stateDeleted,
size: size,
}
} else {
files[key] = needleState{
state: statePresent,
size: size,
}
}
if actual := offset.ToAcutalOffset(); actual > maxOffset {
maxOffset = actual
}
return nil
})
if err != nil {
return nil, 0, err
}
return files, maxOffset, nil
}

func getNeedleFileId(v uint32, nid types.NeedleId, addr string) (string, error) {
var id string
err := operation.WithVolumeServerClient(addr, grpcDialOption, func(vs volume_server_pb.VolumeServerClient) error {
resp, err := vs.VolumeNeedleStatus(context.Background(), &volume_server_pb.VolumeNeedleStatusRequest{
VolumeId: v,
NeedleId: uint64(nid),
})
if err != nil {
return err
}
id = needle.NewFileId(needle.VolumeId(v), resp.NeedleId, resp.Cookie).String()
return nil
})
return id, err
}
15 changes: 15 additions & 0 deletions weed/pb/volume_server.proto
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ service VolumeServer {
rpc Query (QueryRequest) returns (stream QueriedStripe) {
}

rpc VolumeNeedleStatus (VolumeNeedleStatusRequest) returns (VolumeNeedleStatusResponse) {
}
}

//////////////////////////////////////////////////
Expand Down Expand Up @@ -463,3 +465,16 @@ message QueryRequest {
message QueriedStripe {
bytes records = 1;
}

message VolumeNeedleStatusRequest {
uint32 volume_id = 1;
uint64 needle_id = 2;
}
message VolumeNeedleStatusResponse {
uint64 needle_id = 1;
uint32 cookie = 2;
uint32 size = 3;
uint64 last_modified = 4;
uint32 crc = 5;
string ttl = 6;
}
Loading

0 comments on commit 437d187

Please sign in to comment.