Feat/fetch p2p network info dfdaemon#5
Feat/fetch p2p network info dfdaemon#5wangforthinker wants to merge 39 commits intofeat/antsystemfrom
Conversation
…work if need update. fix bug
add some log for test.
fix not report to uploader.
fix requestManager get wrong front urls.
Feat/support local task reload
add heart beat for peer
79ef05b to
d92072a
Compare
e8ae41e to
f241659
Compare
| "sync" | ||
| ) | ||
|
|
||
| // finiteQueue provides a circle queue with capacity, and it will weed out the earlier item if full |
There was a problem hiding this comment.
pkg/queue 下面有个有限队列的实现,跟这个区别在于队列满时会阻塞。可否考虑将本实现也放到pkg/queue下面。
另外我觉得应该叫循环队列,但是我从实现上看更像个cache。
There was a problem hiding this comment.
嗯 这是一个有限长度的循环队列。已放在pkg/queue.
| type finiteQueue struct { | ||
| sync.Mutex | ||
| capacity int | ||
| head *itemNode |
| uploaderAPI api.UploaderAPI | ||
|
|
||
| // postNotifyUploader should be called after notify the local uploader finish | ||
| postNotifyUploader func(req *api.FinishTaskRequest) |
There was a problem hiding this comment.
dfget/core/uploader/uploader.go
Outdated
| ticker := time.NewTicker(5 * time.Millisecond) | ||
| defer ticker.Stop() | ||
| timeout := time.After(233 * time.Millisecond) | ||
| timeout := time.After(10 * time.Second) |
| logrus.Infof("in downloadPiece by returnSrc, url: %s, header: %v, err: %d", pc.pieceTask.Url, header, err) | ||
| }else{ | ||
| downloadRequest := pc.createDownloadRequest() | ||
| downloadRequest.PieceRange = fmt.Sprintf("0-%d", downloadRequest.PieceSize + config.PieceMetaSize) |
| buf := make([]byte, 128) | ||
| binary.BigEndian.PutUint32(buf, uint32((pieceSize)|(pieceSize)<<4)) | ||
| content.Write(buf[:config.PieceHeadSize]) | ||
| defer content.Write([]byte{config.PieceTailChar}) |
There was a problem hiding this comment.
直接添加分片头尾信息有问题,因为分片的md5值是包含了:piece head + piece data + piece tail。
这种情况下考虑使用 NewLimitReaderWithLimiterAndMD5Sum ,下载前后将分片头尾信息放到md5计算中。
| func (csw *ClientStreamWriter) Read(p []byte) (n int, err error) { | ||
| n, err = csw.limitReader.Read(p) | ||
| csw.alreadyReadSize += int64(n) | ||
| if csw.alreadyReadSize == csw.expectReadSize { |
There was a problem hiding this comment.
这里是为了做什么?csw.alreadyReadSize += int64(n) 结果可能也大于 csw. expectReadSize
There was a problem hiding this comment.
当读到的内容跟预计的一样多时,主动close(pipewriter)。 不然请求方不会收到EOF。
dfdaemon/transport/transport.go
Outdated
| // fix resource release | ||
| func (roundTripper *DFRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { | ||
| if req.Header.Get("x-nydus-proxy-healthcheck") != "" { | ||
| if roundTripper.config.Extreme.SpecKeyOfDirectRet != "" { |
There was a problem hiding this comment.
这一段改造可否独立个函数出来,因为跟之前的逻辑基本都不一样,可在这里判断进入不同的处理函数中。
0364ad6 to
4d1a730
Compare
pkg/queue/circle_queue.go
Outdated
| // remove the earliest item | ||
| i := q.internalRemoveTail() | ||
| if i != nil { | ||
| delete(q.itemMap, key) |
There was a problem hiding this comment.
The deleted element is not the tail of the list, it may delete nothing actually.
The stored value in list.Element may be like this:
type elementData struct {
key string
data interface{}
}So delete the value from map can be written:
delete(q.itemMap, i.Value.(*elementData).key)|
|
||
| // if pieceRange == "" means all Pieces of file | ||
| func (sm *SchedulerManager) SchedulerByTaskID(ctx context.Context, taskID string, srcCid string, pieceRange string, pieceSize int32) ([]*Result, error) { | ||
| sm.Lock() |
There was a problem hiding this comment.
这个锁的粒度似乎太大了,所有任务都需要串行调度,而且对于peer负载的更新也依赖于此锁
There was a problem hiding this comment.
实际测试下来,调度所占耗时不到1%,暂时不影响。
| PeerInfo: node.Basic, | ||
| Generation: sm.generation, | ||
| StartDownload: func(peerID string, generation int64) { | ||
| sm.downloadStartCh <- notifySt{peerID: peerID, generation: generation} |
dfdaemon/proxy/proxy.go
Outdated
| WithRegistryMirror(c.RegistryMirror), | ||
| WithStreamDownloaderFactory(func() downloader.Stream { | ||
| // dfget.NewGetter(c.DFGetConfig()) | ||
| dfget.NewGetter(c.DFGetConfig()) |
4d1a730 to
5b9b471
Compare
Ⅰ. Describe what this PR did
Ⅱ. Does this pull request fix one issue?
Ⅲ. Why don't you add test cases (unit test/integration test)? (你真的觉得不需要加测试吗?)
Ⅳ. Describe how to verify it
Ⅴ. Special notes for reviews